nsivabalan commented on code in PR #12597:
URL: https://github.com/apache/hudi/pull/12597#discussion_r1931262721
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java:
##########
@@ -48,36 +49,28 @@ public Option<Pair<HoodieRecord, Schema>>
merge(HoodieRecord older, Schema oldSc
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
- if (newer instanceof HoodieSparkRecord) {
- HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
- if (newSparkRecord.isDelete(newSchema, props)) {
- // Delete record
- return Option.empty();
- }
- } else {
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
- }
+ if (newer instanceof HoodieEmptyRecord) {
+ return Option.empty();
+ }
+ HoodieSparkRecord newRecord = (HoodieSparkRecord) newer;
+ if (older instanceof HoodieEmptyRecord) {
+ return newRecord.isDelete(newSchema, props)
+ ? Option.empty() : Option.of(Pair.of(newer, newSchema));
}
+ HoodieSparkRecord oldRecord = (HoodieSparkRecord) older;
+ Comparable newOrderingVal = newRecord.getOrderingValue(newSchema, props);
+ Comparable oldOrderingVal = oldRecord.getOrderingValue(oldSchema, props);
- if (older instanceof HoodieSparkRecord) {
- HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
- if (oldSparkRecord.isDelete(oldSchema, props)) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
- } else {
- if (older.getData() == null) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
+ // The same logic as fg reader.
+ if (newOrderingVal.equals(0) && newRecord.isDelete(newSchema, props)) {
Review Comment:
newOrderingVal.equals(DEFAULT_COMMIT_TIME_ORDERING)
please use the variable we have for the default value
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -586,6 +588,24 @@ private boolean isDeleteRecord(Option<T> record, Schema
schema) {
}
Object deleteMarker = readerContext.getValue(record.get(), schema,
HOODIE_IS_DELETED_FIELD);
- return deleteMarker instanceof Boolean && (boolean) deleteMarker;
+ if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
+ return true;
+ }
+
+ return isCustomDeleteRecord(record, schema, props);
+ }
+
+ boolean isCustomDeleteRecord(Option<T> record, Schema schema,
TypedProperties props) {
Review Comment:
isCustomeDeleteRecord impl looks to be repeated. can we dedup them
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestCustomDeleteRecord.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig,
RecordMergeMode}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY,
DELETE_MARKER}
+import org.apache.hudi.common.model.{HoodieAvroRecordMerger,
HoodieRecordMerger, OverwriteWithLatestMerger}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.{Dataset, Row, SaveMode}
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestCustomDeleteRecord extends SparkClientFunctionalTestHarness {
+ val expectedEventTimeBased: Seq[(Int, String, String, String, Double,
String)] = Seq(
+ (10, "5", "rider-E", "driver-E", 17.85, "i"),
+ (10, "3", "rider-C", "driver-C", 33.9, "i"),
+ (10, "2", "rider-B", "driver-B", 27.7, "i"))
+ val expectedCommitTimeBased: Seq[(Int, String, String, String, Double,
String)] = Seq(
+ (10, "5", "rider-E", "driver-E", 17.85, "i"),
+ (10, "3", "rider-C", "driver-C", 33.9, "i"))
+
+ @ParameterizedTest
+ @MethodSource(Array("provideParams"))
+ def testCustomDelete(useFgReader: String,
+ tableType: String,
+ recordType: String,
+ positionUsed: String,
+ mergeMode: String): Unit = {
+ val sparkMergeClasses = List(
+ classOf[DefaultSparkRecordMerger].getName,
+ classOf[OverwriteWithLatestSparkRecordMerger].getName).mkString(",")
+ val avroMergerClasses = List(
+ classOf[HoodieAvroRecordMerger].getName,
+ classOf[OverwriteWithLatestMerger].getName).mkString(",")
+
+ val mergeStrategy = if
(mergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING.name)) {
+ HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID
+ } else {
+ HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+ }
+ val mergeOpts: Map[String, String] = Map(
+ HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+ HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key ->
+ (if (recordType.equals("SPARK")) sparkMergeClasses else
avroMergerClasses),
+ HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key -> mergeStrategy)
+ val fgReaderOpts: Map[String, String] = Map(
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader,
+ HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key -> positionUsed,
+ HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode
+ )
+ val deleteOpts: Map[String, String] = Map(
+ DELETE_KEY -> "delete",
+ DELETE_MARKER -> "d")
+ val opts = mergeOpts ++ fgReaderOpts ++ deleteOpts
+ val columns = Seq("ts", "key", "rider", "driver", "fare", "delete")
+
+ val data = Seq(
+ (10, "1", "rider-A", "driver-A", 19.10, "i"),
+ (10, "2", "rider-B", "driver-B", 27.70, "i"),
+ (10, "3", "rider-C", "driver-C", 33.90, "i"),
+ (10, "4", "rider-D", "driver-D", 34.15, "i"),
+ (10, "5", "rider-E", "driver-E", 17.85, "i"))
+ val inserts = spark.createDataFrame(data).toDF(columns: _*)
+ inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Overwrite).
+ save(basePath)
+
+ // Delete using delete markers.
+ val updateData = Seq(
+ (11, "1", "rider-X", "driver-X", 19.10, "d"),
+ (9, "2", "rider-Y", "driver-Y", 27.70, "d"))
+ val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+ updates.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ // Delete from operation.
+ val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+ val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+ deletes.write.format("hudi").
+ option(OPERATION.key(), "delete").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(basePath)
+
+ // Validate in the end.
+ val df = spark.read.format("hudi").options(opts).load(basePath)
+ val finalDf = df.select("ts", "key", "rider", "driver", "fare",
"delete").sort("key")
+ finalDf.show(false)
+ val expected = if (mergeMode ==
RecordMergeMode.COMMIT_TIME_ORDERING.name()) {
+ expectedCommitTimeBased
+ } else {
+ expectedEventTimeBased
+ }
+ val expectedDf = spark.createDataFrame(expected).toDF(columns:
_*).sort("key")
+ TestCustomDeleteRecord.validate(expectedDf, finalDf)
+ }
+}
+
+object TestCustomDeleteRecord {
+ def provideParams(): java.util.List[Arguments] = {
+ java.util.Arrays.asList(
+ Arguments.of("true", "COPY_ON_WRITE", "AVRO", "false",
"EVENT_TIME_ORDERING"),
+ Arguments.of("true", "COPY_ON_WRITE", "AVRO", "true",
"EVENT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "AVRO", "false",
"EVENT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "AVRO", "true",
"EVENT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "AVRO", "false",
"COMMIT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "AVRO", "true",
"COMMIT_TIME_ORDERING"),
+ Arguments.of("true", "COPY_ON_WRITE", "SPARK", "false",
"EVENT_TIME_ORDERING"),
+ Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true",
"EVENT_TIME_ORDERING"),
+ Arguments.of("true", "COPY_ON_WRITE", "SPARK", "false",
"COMMIT_TIME_ORDERING"),
+ Arguments.of("true", "COPY_ON_WRITE", "SPARK", "true",
"COMMIT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "SPARK", "false",
"EVENT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "SPARK", "true",
"EVENT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "SPARK", "false",
"COMMIT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "SPARK", "true",
"COMMIT_TIME_ORDERING")
+ // TODO: enable the following test cases: HUDI-8876
+ // Arguments.of("true", "COPY_ON_WRITE", "AVRO", "false",
"COMMIT_TIME_ORDERING"),
+ // Arguments.of("true", "COPY_ON_WRITE", "AVRO", "true",
"COMMIT_TIME_ORDERING")
Review Comment:
whats the bug here?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java:
##########
@@ -76,6 +76,9 @@ public static HoodieRecordMerger loadRecordMerger(String
mergerClass) {
*/
public static HoodieRecordMerger createRecordMerger(String basePath,
EngineType engineType,
List<String>
mergerClassList, String recordMergerStrategy) {
+ // Currently we fall back to `HoodieAvroRecordMerger` (event time based)
even the specified merger strategy
+ // is commit time based. This behavior has been treated as the norm in
Hudi.
+ // TODO: evaluate the impact of this behavior and unify/simplify merge
behavior in Hudi repo.
Review Comment:
why this comment is added in this patch? I could not understand the relation
##########
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestMerger.java:
##########
@@ -31,9 +31,17 @@
* Avro Merger that always chooses the newer record
*/
public class OverwriteWithLatestMerger implements HoodieRecordMerger {
+ public static final OverwriteWithLatestMerger INSTANCE = new
OverwriteWithLatestMerger();
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older,
+ Schema oldSchema,
+ HoodieRecord newer,
+ Schema newSchema,
+ TypedProperties props)
throws IOException {
+ if (newer.isDelete(newSchema, props)) {
Review Comment:
So, are we standardizing the flows w/ this patch, where any delete(Empty
hoodie record, _hoodie_is_deleted, custom deletes), are all represented as
Option.empty() ?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -230,22 +232,37 @@ public boolean isDelete(Schema recordSchema, Properties
props) {
if (null == data) {
return true;
}
-
// Use metadata filed to decide.
Schema.Field operationField =
recordSchema.getField(OPERATION_METADATA_FIELD);
if (null != operationField
&& HoodieOperation.isDeleteRecord((String)
data.get(operationField.pos(), StringType))) {
return true;
}
-
// Use data field to decide.
- if (recordSchema.getField(HOODIE_IS_DELETED_FIELD) == null) {
- return false;
+ if (recordSchema.getField(HOODIE_IS_DELETED_FIELD) != null) {
+ Object deleteMarker = data.get(
+ recordSchema.getField(HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
+ if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
+ return true;
+ }
}
+ // Use custom field to decide.
+ return isCustomDeleteRecord(recordSchema, props);
Review Comment:
probably we should first process custom delete marker before
HOODIE_IS_DELETED_FIELD.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java:
##########
@@ -94,4 +100,23 @@ protected boolean isDeleteRecord(GenericRecord
genericRecord) {
Object deleteMarker = genericRecord.get(isDeleteKey);
return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}
+
+ boolean isCustomDeleteRecord(Schema schema, Properties props) {
Review Comment:
why are we adding this here?
are we also adding custom delete marker support to all payloads now w/ this
change?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java:
##########
@@ -48,36 +49,28 @@ public Option<Pair<HoodieRecord, Schema>>
merge(HoodieRecord older, Schema oldSc
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
- if (newer instanceof HoodieSparkRecord) {
- HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
- if (newSparkRecord.isDelete(newSchema, props)) {
- // Delete record
- return Option.empty();
- }
- } else {
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
- }
+ if (newer instanceof HoodieEmptyRecord) {
+ return Option.empty();
+ }
+ HoodieSparkRecord newRecord = (HoodieSparkRecord) newer;
+ if (older instanceof HoodieEmptyRecord) {
+ return newRecord.isDelete(newSchema, props)
+ ? Option.empty() : Option.of(Pair.of(newer, newSchema));
}
+ HoodieSparkRecord oldRecord = (HoodieSparkRecord) older;
+ Comparable newOrderingVal = newRecord.getOrderingValue(newSchema, props);
+ Comparable oldOrderingVal = oldRecord.getOrderingValue(oldSchema, props);
- if (older instanceof HoodieSparkRecord) {
- HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
- if (oldSparkRecord.isDelete(oldSchema, props)) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
- } else {
- if (older.getData() == null) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
+ // The same logic as fg reader.
+ if (newOrderingVal.equals(0) && newRecord.isDelete(newSchema, props)) {
+ return Option.empty();
}
- if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
- return Option.of(Pair.of(older, oldSchema));
- } else {
- return Option.of(Pair.of(newer, newSchema));
+ if (!oldOrderingVal.equals(0) && oldOrderingVal.compareTo(newOrderingVal)
> 0) {
Review Comment:
again, lets use the variable we have
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestCustomDeleteRecord.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig,
RecordMergeMode}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY,
DELETE_MARKER}
+import org.apache.hudi.common.model.{HoodieAvroRecordMerger,
HoodieRecordMerger, OverwriteWithLatestMerger}
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.{Dataset, Row, SaveMode}
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestCustomDeleteRecord extends SparkClientFunctionalTestHarness {
+ val expectedEventTimeBased: Seq[(Int, String, String, String, Double,
String)] = Seq(
+ (10, "5", "rider-E", "driver-E", 17.85, "i"),
+ (10, "3", "rider-C", "driver-C", 33.9, "i"),
+ (10, "2", "rider-B", "driver-B", 27.7, "i"))
+ val expectedCommitTimeBased: Seq[(Int, String, String, String, Double,
String)] = Seq(
+ (10, "5", "rider-E", "driver-E", 17.85, "i"),
+ (10, "3", "rider-C", "driver-C", 33.9, "i"))
+
+ @ParameterizedTest
+ @MethodSource(Array("provideParams"))
+ def testCustomDelete(useFgReader: String,
+ tableType: String,
+ recordType: String,
+ positionUsed: String,
+ mergeMode: String): Unit = {
+ val sparkMergeClasses = List(
+ classOf[DefaultSparkRecordMerger].getName,
+ classOf[OverwriteWithLatestSparkRecordMerger].getName).mkString(",")
+ val avroMergerClasses = List(
+ classOf[HoodieAvroRecordMerger].getName,
+ classOf[OverwriteWithLatestMerger].getName).mkString(",")
+
+ val mergeStrategy = if
(mergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING.name)) {
+ HoodieRecordMerger.DEFAULT_MERGE_STRATEGY_UUID
+ } else {
+ HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID
+ }
+ val mergeOpts: Map[String, String] = Map(
+ HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet",
+ HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key ->
+ (if (recordType.equals("SPARK")) sparkMergeClasses else
avroMergerClasses),
+ HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key -> mergeStrategy)
+ val fgReaderOpts: Map[String, String] = Map(
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader,
+ HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key -> positionUsed,
+ HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode
+ )
+ val deleteOpts: Map[String, String] = Map(
+ DELETE_KEY -> "delete",
Review Comment:
can we name the data col as "op" or "operation". its confusing to name it as
"delete"
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java:
##########
@@ -230,22 +232,37 @@ public boolean isDelete(Schema recordSchema, Properties
props) {
if (null == data) {
return true;
}
-
// Use metadata filed to decide.
Schema.Field operationField =
recordSchema.getField(OPERATION_METADATA_FIELD);
if (null != operationField
&& HoodieOperation.isDeleteRecord((String)
data.get(operationField.pos(), StringType))) {
return true;
}
-
// Use data field to decide.
- if (recordSchema.getField(HOODIE_IS_DELETED_FIELD) == null) {
- return false;
+ if (recordSchema.getField(HOODIE_IS_DELETED_FIELD) != null) {
+ Object deleteMarker = data.get(
+ recordSchema.getField(HOODIE_IS_DELETED_FIELD).pos(), BooleanType);
+ if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
+ return true;
+ }
}
+ // Use custom field to decide.
+ return isCustomDeleteRecord(recordSchema, props);
Review Comment:
can we move this processing before L 242
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java:
##########
@@ -48,36 +49,28 @@ public Option<Pair<HoodieRecord, Schema>>
merge(HoodieRecord older, Schema oldSc
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
- if (newer instanceof HoodieSparkRecord) {
- HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
- if (newSparkRecord.isDelete(newSchema, props)) {
- // Delete record
- return Option.empty();
- }
- } else {
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
- }
+ if (newer instanceof HoodieEmptyRecord) {
+ return Option.empty();
+ }
+ HoodieSparkRecord newRecord = (HoodieSparkRecord) newer;
+ if (older instanceof HoodieEmptyRecord) {
+ return newRecord.isDelete(newSchema, props)
+ ? Option.empty() : Option.of(Pair.of(newer, newSchema));
}
+ HoodieSparkRecord oldRecord = (HoodieSparkRecord) older;
+ Comparable newOrderingVal = newRecord.getOrderingValue(newSchema, props);
+ Comparable oldOrderingVal = oldRecord.getOrderingValue(oldSchema, props);
- if (older instanceof HoodieSparkRecord) {
- HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
- if (oldSparkRecord.isDelete(oldSchema, props)) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
- } else {
- if (older.getData() == null) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
+ // The same logic as fg reader.
+ if (newOrderingVal.equals(0) && newRecord.isDelete(newSchema, props)) {
+ return Option.empty();
}
- if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
- return Option.of(Pair.of(older, oldSchema));
- } else {
- return Option.of(Pair.of(newer, newSchema));
+ if (!oldOrderingVal.equals(0) && oldOrderingVal.compareTo(newOrderingVal)
> 0) {
Review Comment:
lets add java docs for various `if`, `else` blocks calling out the scenario.
for eg, in L 69 we could add
```
both old and new record has valid ordering value and old record has higher
ordering value compared to new record.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]