nsivabalan commented on code in PR #13115:
URL: https://github.com/apache/hudi/pull/13115#discussion_r2038316020
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -223,13 +223,13 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
totalCorruptBlocks = new AtomicLong(0);
totalLogBlocks = new AtomicLong(0);
totalLogRecords = new AtomicLong(0);
- HoodieLogFormatReverseReader logFormatReaderWrapper = null;
+ HoodieLogFormatReader logFormatReaderWrapper = null;
HoodieTimeline commitsTimeline =
this.hoodieTableMetaClient.getCommitsTimeline();
HoodieTimeline completedInstantsTimeline =
commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline =
commitsTimeline.filterInflights();
try {
// Iterate over the paths
- logFormatReaderWrapper = new HoodieLogFormatReverseReader(storage,
+ logFormatReaderWrapper = new HoodieLogFormatReader(storage,
Review Comment:
did we already inconsistency b/w scanInternalV1 and scanInternalV2? :(
looks like for V2, we were doing ascending already.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -249,14 +250,12 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial
updates
// Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it
should be put as
- // the `older` in the merge API
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ readerContext.getSchemaFromMetadata(metadata),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
Review Comment:
I still have tough time to understand the expected value here.
for eg, after we call recordMerger.get().partialMerge(), what is the
expected return value if record is deleted. and if its deleted don't we need to
return the deletion to the caller so that hash map gets updated.
but in L262, we are returning Option.empty
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -249,14 +250,12 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial
updates
// Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it
should be put as
- // the `older` in the merge API
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ readerContext.getSchemaFromMetadata(metadata),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
Review Comment:
Also, can we introduce a simple POJO to represent a record in this class.
as of now, we are using
Pair<Option<T>, Map<String, Object>>
where key -> record key or position
Map contains metadata like ordering value, schema version id.
thinking how it might look like, if we have something like
FileGroupRecord {
Option<T> record;
Map<String, Object>> recordMetadata;
boolean isDeleted;
}
So, w/ this, we can whenever we call doProcessNextDataRecord(NewRecord,
existingRecord),
we can make sure it returns an entry if existing is not null. By returning
an entry means, it could refer to a valid record or a deleted record. But
essentially, the return value will update the hashmap of records we maintain
blindly.
For a delete record, FileGroupRecord.record might be Option.empty and
FileGroupRecord.isDeleted will be true. and optionally there could be ordering
value in the map.
Unless existingRecord is null when we call doProcessNextDataRecord, we
should always return something from doProcessNextDataRecord. I feel this might
help us keep the code maintainable.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -249,14 +250,12 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial
updates
// Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it
should be put as
- // the `older` in the merge API
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ readerContext.getSchemaFromMetadata(metadata),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
Review Comment:
I guess, we are trying to optimize to avoid returning from
doProcessNextDataRecord, if we do not want to update the map (for eg, if new
record has lower ordering value compared to existing record). I feel, those are
probably < 5% of cases. Lets keep it simple and ensure we make it fool proof.
its ok to always return value from doProcessNextDataRecord and update the map
w/ the same record as prev version.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -249,14 +250,12 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial
updates
// Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it
should be put as
- // the `older` in the merge API
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ readerContext.getSchemaFromMetadata(metadata),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
Review Comment:
Also, thinking if we should standardize how a deleted record could be
represented in FG reader.
a deleted record from delete block, HoodieEmptyRecordPayload etc.
and if we have above abstraction in place (FileGroupRecord), we can also see
if we can avoid using DeleteRecord.
Its again causing some confusion since, doProcessNextDeletedRecord return
Option<DeleteRecord>, but while the caller processes is and updates the
`records` map which eventually results in the format Pair<Option, Map<String,
Object>>.
so, in the end, every record is represented in the same format in the cached
map of `records`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -249,14 +250,12 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial
updates
// Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it
should be put as
- // the `older` in the merge API
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ readerContext.getSchemaFromMetadata(metadata),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
Review Comment:
anyways, not blocking the PR. but we should add more documentation to make
it more easier to understand
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]