jonvex commented on code in PR #12122:
URL: https://github.com/apache/hudi/pull/12122#discussion_r1809491790
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -278,14 +283,16 @@ protected Option<DeleteRecord>
doProcessNextDeletedRecord(DeleteRecord deleteRec
if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingVal)) {
return Option.empty();
}
- Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
- // Checks the ordering value does not equal to 0
- // because we use 0 as the default value which means natural order
- boolean chooseExisting = !deleteOrderingVal.equals(0)
- && ReflectionUtils.isSameClass(existingOrderingVal,
deleteOrderingVal)
- && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
- if (chooseExisting) {
- // The DELETE message is obsolete if the old message has greater
orderingVal.
+ Comparable deleteOrderingVal = deleteRecord.getOrderingValue() ==
null ? orderingFieldDefault : deleteRecord.getOrderingValue();
+ // Here existing record represents newer record with the same key,
which can be a delete or non-delete record.
+ // Therefore, we should use event time based merging if possible.
So, the newer record is returned if
+ // 1. the delete is processing time based, or
+ // 2. delete is event time based, and the existing record has higher
value.
+ if (isProcessingTimeBasedDelete(deleteOrderingVal)) {
+
existingRecordMetadataPair.getRight().put(DELETE_FOUND_WITHOUT_ORDERING_VALUE,
"true");
+ return Option.empty();
+ }
+ if (ReflectionUtils.isSameClass(existingOrderingVal,
deleteOrderingVal) && deleteOrderingVal.compareTo(existingOrderingVal) <= 0) {
Review Comment:
I think the class should be the same by the time we get here
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieMergedLogRecordScanner.java:
##########
@@ -197,16 +209,16 @@ public Iterator<HoodieRecord> iterator() {
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
String key = deleteRecord.getRecordKey();
HoodieRecord oldRecord = records.get(key);
+
+ Comparable deleteOrderingVal = deleteRecord.getOrderingValue() == null ?
orderingFieldDefault : deleteRecord.getOrderingValue();
Review Comment:
nit I think there is an extra space after the ":"
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/TypeCaster.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TypeCaster {
Review Comment:
I think this logic is kinda duplicate of what I did for the casting of the
ordering field in the fg reader
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java:
##########
@@ -86,6 +88,10 @@ public void processDataBlock(HoodieDataBlock dataBlock,
Option<KeySpec> keySpecO
@Override
public void processNextDataRecord(T record, Map<String, Object> metadata,
Serializable recordKey) throws IOException {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
+ if (existingRecordMetadataPair != null &&
existingRecordMetadataPair.getRight().containsKey(DELETE_FOUND_WITHOUT_ORDERING_VALUE))
{
Review Comment:
Add a comment here saying that because we process the logs in reverse order
the delete we found earlier must have a later processing time and didn't have
ordering value so we assume that it's definitely overwriting
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala:
##########
@@ -719,7 +719,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
val dataGen = new QuickstartUtils.DataGenerator
val inserts =
QuickstartUtils.convertToStringList(dataGen.generateInserts(10)).asScala.toSeq
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
- .withColumn("ts", lit("20240404000000")) // to make test
determinate for HOODIE_AVRO_DEFAULT payload
+ .withColumn("ts", lit("2024040400000")) // to make test
determinate for HOODIE_AVRO_DEFAULT payload
Review Comment:
did you change this so it is an int instead of a long? Why was that causing
the test to fail?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestDeleteRecordLogic.scala:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger}
+import org.apache.spark.sql.{Dataset, Row, SaveMode}
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestDeleteRecordLogic extends SparkClientFunctionalTestHarness{
+ val expected1 = Seq(
+ (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+ (10, "3", "rider-C", "driver-C", 33.9, 10),
+ (10, "2", "rider-B", "driver-B", 27.7, 1))
+
+ val expected2 = Seq(
+ (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+ (-9, "4", "rider-DDDD", "driver-DDDD", 20.0, 1),
+ (10, "3", "rider-C", "driver-C", 33.9, 10),
+ (10, "2", "rider-B", "driver-B", 27.7, 1))
+
+ @ParameterizedTest
+ @MethodSource(Array("provideParams"))
+ def testDeleteLogic(useFgReader: String, tableType: String, recordType:
String): Unit = {
+ val sparkOpts: Map[String, String] = Map(
+ HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+ HoodieWriteConfig.RECORD_MERGER_IMPLS.key ->
classOf[DefaultSparkRecordMerger].getName)
+ val fgReaderOpts: Map[String, String] = Map(
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader)
+
+ val opts = if (recordType.equals("SPARK")) sparkOpts ++ fgReaderOpts else
fgReaderOpts
+ val columns = Seq("ts", "key", "rider", "driver", "fare", "number")
+
+ val data = Seq(
+ (10, "1", "rider-A", "driver-A", 19.10, 7),
+ (10, "2", "rider-B", "driver-B", 27.70, 1),
+ (10, "3", "rider-C", "driver-C", 33.90, 10),
+ (-1, "4", "rider-D", "driver-D", 34.15, 6),
+ (10, "5", "rider-E", "driver-E", 17.85, 10))
+ val inserts = spark.createDataFrame(data).toDF(columns: _*)
+ inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Overwrite).
+ save(basePath)
+
+ val updateData = Seq(
+ (11, "1", "rider-X", "driver-X", 19.10, 9),
+ (9, "2", "rider-Y", "driver-Y", 27.70, 7))
+ val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+ updates.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+ val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+ deletes.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "delete").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val secondUpdateData = Seq(
+ (14, "5", "rider-Z", "driver-Z", 17.85, 3),
+ (-10, "4", "rider-DD", "driver-DD", 34.15, 5))
+ val secondUpdates = spark.createDataFrame(secondUpdateData).toDF(columns:
_*)
+ secondUpdates.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val secondDeletesData = Seq(
+ (10, "4", "rider-D", "driver-D", 34.15, 6),
+ (0, "1", "rider-X", "driver-X", 19.10, 8))
+ val secondDeletes = spark.createDataFrame(secondDeletesData).toDF(columns:
_*)
+ secondDeletes.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "delete").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val thirdUpdateData = Seq((-8, "4", "rider-DDD", "driver-DDD", 20.00, 1))
+ val thirdUpdates = spark.createDataFrame(thirdUpdateData).toDF(columns: _*)
+ thirdUpdates.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ val thirdDeletesData = Seq(
+ (10, "4", "rider-D4", "driver-D4", 34.15, 6),
+ (0, "1", "rider-X", "driver-X", 19.10, 8))
+ val thirdDeletes = spark.createDataFrame(thirdDeletesData).toDF(columns:
_*)
+ thirdDeletes.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(OPERATION.key(), "delete").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ var metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath()).setConf(storageConf()).build()
+ if (tableType.equals("MERGE_ON_READ")) {
+ val activeTimeline = metaClient.getActiveTimeline
+ val compactionNum = activeTimeline.getAllCommitsTimeline
+ .getInstantsAsStream.filter(t => t.isCompleted() &&
t.getAction.equals("commit")).count()
+ assertTrue(compactionNum == 0)
+ }
+
+ val DfBeforeFinish = spark.read.format("hudi").options(opts).load(basePath)
+ val actualDfBeforeFinish = DfBeforeFinish.select("ts", "key", "rider",
"driver", "fare", "number").sort("ts")
+ actualDfBeforeFinish.show(false)
+ val expectedDfBeforeFinish =
spark.createDataFrame(expected1).toDF(columns: _*).sort("ts")
+ TestDeleteRecordLogic.validate(expectedDfBeforeFinish,
actualDfBeforeFinish)
+
+ val fourUpdateData = Seq((-9, "4", "rider-DDDD", "driver-DDDD", 20.00, 1))
+ val fourUpdates = spark.createDataFrame(fourUpdateData).toDF(columns: _*)
+ fourUpdates.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(),
+ if (tableType.equals("MERGE_ON_READ")) "true" else "false").
+ option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1").
+ option(OPERATION.key(), "upsert").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ if (tableType.equals("MERGE_ON_READ")) {
+ val activeTimeline = metaClient.getActiveTimeline
+ val compactionNum = activeTimeline.getAllCommitsTimeline
+ .getInstantsAsStream.filter(t => t.isCompleted() &&
t.getAction.equals("commit")).count()
+ assertTrue(compactionNum == 1)
+ }
+
+ // Validate in the end.
+ val df = spark.read.format("hudi").options(opts).load(basePath)
+ val finalDf = df.select("ts", "key", "rider", "driver", "fare",
"number").sort("ts")
+ finalDf.show(false)
+ val expectedDf = spark.createDataFrame(expected2).toDF(columns:
_*).sort("ts")
+ TestDeleteRecordLogic.validate(expectedDf, finalDf)
+ }
+}
+
+object TestDeleteRecordLogic {
+ def provideParams(): java.util.List[Arguments] = {
+ java.util.Arrays.asList(
+ Arguments.of("false", "COPY_ON_WRITE", "AVRO"),
+ Arguments.of("false", "COPY_ON_WRITE", "SPARK"),
+ Arguments.of("false", "MERGE_ON_READ", "AVRO"),
+ Arguments.of("false", "MERGE_ON_READ", "SPARK"),
+ Arguments.of("true", "COPY_ON_WRITE", "AVRO"),
+ Arguments.of("true", "COPY_ON_WRITE", "SPARK"),
+ Arguments.of("true", "MERGE_ON_READ", "AVRO"),
+ Arguments.of("true", "MERGE_ON_READ", "SPARK"))
+ }
+
+ def validate(expectedDf: Dataset[Row], actualDf: Dataset[Row]): Unit = {
+ val expectedMinusActual = expectedDf.except(actualDf)
+ val actualMinusExpected = actualDf.except(expectedDf)
+
+ expectedMinusActual.show(false)
+ actualMinusExpected.show(false)
Review Comment:
why are we calling show?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -252,6 +254,9 @@ protected Option<Pair<T, Map<String, Object>>>
doProcessNextDataRecord(T record,
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
// payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
// it since these records will be put into records(Map).
+ Comparable orderingValue = readerContext.getOrderingValue(
+ Option.of(record), metadata, readerSchema, orderingFieldName,
orderingFieldType, orderingFieldDefault);
+ metadata.put(INTERNAL_META_ORDERING_FIELD, orderingValue);
Review Comment:
why were we not doing this before? I am a little uneasy about this.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java:
##########
@@ -108,10 +114,20 @@ public void processDeleteBlock(HoodieDeleteBlock
deleteBlock) throws IOException
@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
recordKey) {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
+ if (deleteRecord.getOrderingValue() == null && existingRecordMetadataPair
!= null) {
+
existingRecordMetadataPair.getRight().put(DELETE_FOUND_WITHOUT_ORDERING_VALUE,
"true");
+ return;
+ }
+
Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecordMetadataPair);
if (recordOpt.isPresent()) {
+ Comparable orderingVal = recordOpt.get().getOrderingValue() == null ?
orderingFieldDefault : recordOpt.get().getOrderingValue();
Review Comment:
we do this a couple times can we make a util function to do this?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java:
##########
@@ -195,11 +196,21 @@ public void processDeleteBlock(HoodieDeleteBlock
deleteBlock) throws IOException
@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
recordPosition) {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
+ if (deleteRecord.getOrderingValue() == null && existingRecordMetadataPair
!= null) {
+
existingRecordMetadataPair.getRight().put(DELETE_FOUND_WITHOUT_ORDERING_VALUE,
"true");
+ return;
+ }
+
Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecordMetadataPair);
if (recordOpt.isPresent()) {
String recordKey = recordOpt.get().getRecordKey();
+ Comparable orderingVal = recordOpt.get().getOrderingValue() == null ?
orderingFieldDefault : recordOpt.get().getOrderingValue();
Review Comment:
util method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]