cshuo commented on code in PR #17668:
URL: https://github.com/apache/hudi/pull/17668#discussion_r2642304334
##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -44,24 +51,48 @@ public EventTimeAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+ if ((recordBytes.length == 0 || isDeletedRecord) && orderingVal.equals(0)){
+ //use natural for delete record
+ return this;
+ }
+ Comparable oldValueOrderingVal = oldValue.orderingVal;
+ Comparable thisOrderingVal = orderingVal;
+ if (thisOrderingVal instanceof Utf8 && oldValueOrderingVal instanceof
String){
+ thisOrderingVal = thisOrderingVal.toString();
+ }
+ if (thisOrderingVal instanceof GenericData.Fixed && oldValueOrderingVal
instanceof BigDecimal){
+ Conversions.DecimalConversion conversion = new
Conversions.DecimalConversion();
+ thisOrderingVal = conversion.fromFixed((GenericData.Fixed)
thisOrderingVal,((GenericData.Fixed)
thisOrderingVal).getSchema(),((GenericData.Fixed)
thisOrderingVal).getSchema().getLogicalType());
+ }
+ if (oldValueOrderingVal.compareTo(thisOrderingVal)>0){
+ return oldValue;
+ }else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
/*
* Check if the incoming record is a delete record.
*/
- if (recordBytes.length == 0 || isDeletedRecord) {
- return Option.empty();
- }
-
- GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+// if (recordBytes.length == 0 || isDeletedRecord) {
Review Comment:
can be removed.
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -480,7 +480,259 @@ void testStreamReadWithDeletes() throws Exception {
final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]";
assertRowsEquals(result, expected, true);
}
+ //scene01 compaction off
+ @Test
Review Comment:
let's use `@ParameterizedTest` to merge the tests, seems we only need two
tests, string ordering fields with compaction off/on, decimal ordering fields
with compaction off/on.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -77,4 +108,39 @@ public Option<IndexedRecord> getInsertValue(Schema schema,
Properties properties
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}
+
+
+ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
+ Option<IndexedRecord>
incomingRecord, Properties properties) {
+ /*
+ * Combining strategy here returns currentValue on disk if incoming record
is older.
+ * The incoming record can be either a delete (sent as an upsert with
_hoodie_is_deleted set to true)
+ * or an insert/update record. In any case, if it is older than the record
in disk, the currentValue
+ * in disk is returned (to be rewritten with new commit time).
+ *
+ * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation
type do not hit this code path
+ * and need to be dealt with separately.
+ */
+ String orderField = ConfigUtils.getOrderingField(properties);
+ if (orderField == null) {
+ return true;
+ }
+ boolean consistentLogicalTimestampEnabled =
Boolean.parseBoolean(properties.getProperty(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ Object persistedOrderingVal =
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+ orderField,
+ true, consistentLogicalTimestampEnabled);
+ Comparable incomingOrderingVal = incomingRecord.map(record-> (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
+ orderField,
+ true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+ if (persistedOrderingVal instanceof Utf8){
+ persistedOrderingVal=persistedOrderingVal.toString();
+ }
+ if (persistedOrderingVal instanceof GenericData.Fixed){
Review Comment:
for decimal type, seems we can use
`HoodieAvroUtils#convertValueForSpecificDataTypes` directly.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -44,24 +51,48 @@ public EventTimeAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+ if ((recordBytes.length == 0 || isDeletedRecord) && orderingVal.equals(0)){
+ //use natural for delete record
+ return this;
+ }
+ Comparable oldValueOrderingVal = oldValue.orderingVal;
+ Comparable thisOrderingVal = orderingVal;
+ if (thisOrderingVal instanceof Utf8 && oldValueOrderingVal instanceof
String){
+ thisOrderingVal = thisOrderingVal.toString();
+ }
+ if (thisOrderingVal instanceof GenericData.Fixed && oldValueOrderingVal
instanceof BigDecimal){
+ Conversions.DecimalConversion conversion = new
Conversions.DecimalConversion();
+ thisOrderingVal = conversion.fromFixed((GenericData.Fixed)
thisOrderingVal,((GenericData.Fixed)
thisOrderingVal).getSchema(),((GenericData.Fixed)
thisOrderingVal).getSchema().getLogicalType());
+ }
+ if (oldValueOrderingVal.compareTo(thisOrderingVal)>0){
+ return oldValue;
+ }else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
/*
* Check if the incoming record is a delete record.
*/
- if (recordBytes.length == 0 || isDeletedRecord) {
- return Option.empty();
- }
-
- GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+// if (recordBytes.length == 0 || isDeletedRecord) {
+// return Option.empty();
+// }
+//
+// GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+ Option<IndexedRecord> incomingRecord = recordBytes.length==0 ||
isDeletedRecord ? Option.empty() : Option.of(bytesToAvro(recordBytes,schema));
// Null check is needed here to support schema evolution. The record in
storage may be from old schema where
// the new ordering column might not be present and hence returns null.
if (!needUpdatingPersistedRecord(currentValue, incomingRecord,
properties)) {
return Option.of(currentValue);
}
- return Option.of(incomingRecord);
+// return Option.of(incomingRecord);
Review Comment:
can be removed.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -77,4 +108,39 @@ public Option<IndexedRecord> getInsertValue(Schema schema,
Properties properties
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}
+
+
+ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
+ Option<IndexedRecord>
incomingRecord, Properties properties) {
+ /*
+ * Combining strategy here returns currentValue on disk if incoming record
is older.
+ * The incoming record can be either a delete (sent as an upsert with
_hoodie_is_deleted set to true)
+ * or an insert/update record. In any case, if it is older than the record
in disk, the currentValue
+ * in disk is returned (to be rewritten with new commit time).
+ *
+ * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation
type do not hit this code path
+ * and need to be dealt with separately.
+ */
+ String orderField = ConfigUtils.getOrderingField(properties);
+ if (orderField == null) {
+ return true;
+ }
+ boolean consistentLogicalTimestampEnabled =
Boolean.parseBoolean(properties.getProperty(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ Object persistedOrderingVal =
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+ orderField,
+ true, consistentLogicalTimestampEnabled);
+ Comparable incomingOrderingVal = incomingRecord.map(record-> (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
+ orderField,
+ true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+ if (persistedOrderingVal instanceof Utf8){
Review Comment:
There are 3 places in this pr having this logic, maybe we can provide an
common util function to process this, like `Pair<Comparable, Comparable>
canonicalizeOrderingValue(Comparable oldOrder, Comparable incomingOrder)`.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -77,4 +108,39 @@ public Option<IndexedRecord> getInsertValue(Schema schema,
Properties properties
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}
+
+
+ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
Review Comment:
we can modify `needUpdatingPersistedRecord` in `DefaultHoodieRecordPayload`
directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]