danny0405 commented on code in PR #7842:
URL: https://github.com/apache/hudi/pull/7842#discussion_r1098201307
##########
hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java:
##########
@@ -196,30 +196,40 @@ protected Option<IndexedRecord>
mergeDisorderRecordsWithMetadata(
/**
* Returns whether the given record is newer than the record of this payload.
*
- * @param orderingVal
- * @param record The record
- * @param prop The payload properties
- *
+ * @param schema The schema
+ * @param record The record
+ * @param prop The payload properties
* @return true if the given record is newer
*/
- private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord
record, Properties prop) {
+ private boolean isRecordNewer(Schema schema, IndexedRecord record,
Properties prop) throws IOException {
String orderingField =
prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
+ String preCombineField =
prop.getProperty("hoodie.datasource.write.precombine.field");
+
if (!StringUtils.isNullOrEmpty(orderingField)) {
boolean consistentLogicalTimestampEnabled =
Boolean.parseBoolean(prop.getProperty(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ Comparable incomingOrderingVal;
+ if (orderingField.equals(preCombineField)) {
+ incomingOrderingVal = orderingVal;
Review Comment:
Not sure what are we fixing here, do you mean the record payload ordering
field and preCombine field are actually the same field? In Flink, we force the
ordering field as the preCombine field, see `SteamerUtil.getPayloadConfig` for
details.
And in general, the ordering field is passed as a param for the payload
constructor, so how and when in-consistency happens?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]