nsivabalan commented on code in PR #13115:
URL: https://github.com/apache/hudi/pull/13115#discussion_r2036318344
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -233,11 +233,12 @@ public void close() {
/**
* Merge two log data records if needed.
*
- * @param record
- * @param metadata
- * @param existingRecordMetadataPair
- * @return
- * @throws IOException
+ * @param record The record
+ * @param metadata The metadata
+ * @param existingRecordMetadataPair The existing record metadata pair
+ *
+ * @return The pair of the record that needs to be updated with and its
metadata,
+ * returns empty to skip the update.
*/
protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T record,
Review Comment:
can we change the 1st arg to "newRecord" to avoid confusion.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -233,11 +233,12 @@ public void close() {
/**
* Merge two log data records if needed.
*
- * @param record
- * @param metadata
- * @param existingRecordMetadataPair
- * @return
- * @throws IOException
+ * @param record The record
+ * @param metadata The metadata
+ * @param existingRecordMetadataPair The existing record metadata pair
+ *
+ * @return The pair of the record that needs to be updated with and its
metadata,
+ * returns empty to skip the update.
Review Comment:
can we enhance the java docs of this method.
I would like to see whats the expected behavior in all diff cases.
say for event time based merge
old. | new. | expected behavior
-------|----|-------
valid rec w/ valid ord value | valid rec w/ same ord value | new
valid rec w/ valid ord value | valid rec w/ higher ord value | new
valid rec w/ valid ord value | valid rec w/ lower order value | old
valid rec w/ valid ord value | valid rec w/ missing ord value | new
valid rec w/ valid ord value | deleted w/ same ord value | deleted. but
whats the expected return val from this method
valid rec w/ valid ord value | deleted w/ higher ord value | deleted. but
whats the expected return val from this method
valid rec w/ valid ord value | deleted w/ lower ord value | old
valid rec w/ valid ord value | deleted w/ missing ord value | deleted. but
whats the expected return val from this method
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -275,43 +274,47 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return Option.empty();
+ return Option.of(Pair.of(Option.ofNullable(record), metadata));
case EVENT_TIME_ORDERING:
- Comparable existingOrderingValue = readerContext.getOrderingValue(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(),
- readerSchema, orderingFieldName);
- if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingValue)) {
- return Option.empty();
- }
- Comparable incomingOrderingValue = readerContext.getOrderingValue(
- Option.of(record), metadata, readerSchema, orderingFieldName);
- if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
+ if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
return Option.of(Pair.of(Option.of(record), metadata));
}
return Option.empty();
Review Comment:
logic used in L278-282 and L481-488 are slightly differing. specifically in
the `else` block. is that intended?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -275,43 +274,47 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return Option.empty();
+ return Option.of(Pair.of(Option.ofNullable(record), metadata));
Review Comment:
can we add tests for this case please?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -341,17 +344,18 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
/**
* Merge a delete record with another record (data, or delete).
*
- * @param deleteRecord
- * @param existingRecordMetadataPair
- * @return
+ * @param deleteRecord The delete record
+ * @param existingRecordMetadataPair The existing record metadata pair
+ *
+ * @return The option of new delete record that needs to be updated with.
*/
protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord
deleteRecord,
Pair<Option<T>,
Map<String, Object>> existingRecordMetadataPair) {
totalLogRecords++;
if (existingRecordMetadataPair != null) {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return Option.empty();
+ return Option.of(deleteRecord);
Review Comment:
do we have tests for this case?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -504,6 +529,19 @@ protected Option<T> merge(Option<T> older, Map<String,
Object> olderInfoMap,
}
}
+ /**
+ * Decides whether to keep the incoming record with ordering value
comparison.
+ */
+ private boolean shouldKeepNewerRecord(Option<T> oldVal, Map<String, Object>
oldMetadata, Option<T> newVal, Map<String, Object> newMetadata) {
+ Comparable newOrderingVal = readerContext.getOrderingValue(newVal,
newMetadata, readerSchema, orderingFieldName);
+ if (isDeleteRecordWithNaturalOrder(newVal, newOrderingVal)) {
+ // handle records coming from DELETE statements(the orderingVal is
constant 0)
+ return true;
+ }
+ Comparable oldOrderingVal = readerContext.getOrderingValue(oldVal,
oldMetadata, readerSchema, orderingFieldName);
+ return newOrderingVal.compareTo(oldOrderingVal) >= 0;
Review Comment:
this could be a very corner case and we may not have a solution too. but
wanted to bring it up.
what incase oldRecord was missing ord value and so we treat it as 0.
and new one has negative ordering value.
w/ L542, we will favor old record right. even though old one did not have a
ordering value and new one has a valid value.
not looking to fix in this patch. was just curious if this is a known
limitation
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -233,11 +233,12 @@ public void close() {
/**
* Merge two log data records if needed.
*
- * @param record
- * @param metadata
- * @param existingRecordMetadataPair
- * @return
- * @throws IOException
+ * @param record The record
+ * @param metadata The metadata
+ * @param existingRecordMetadataPair The existing record metadata pair
+ *
+ * @return The pair of the record that needs to be updated with and its
metadata,
+ * returns empty to skip the update.
Review Comment:
similar to this, we need to prepare a table and document it for all 3 types
of merge modes.
So, that anyone who is touching this code is well aware of whats expected
out of this method.
We have made atleast 4 to 5 bug fixes around this so far.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]