suryaprasanna commented on code in PR #5341:
URL: https://github.com/apache/hudi/pull/5341#discussion_r871581733
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -232,7 +251,7 @@ protected synchronized void scanInternal(Option<KeySpec>
keySpecOpt) {
&&
!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
)) {
// hit a block with instant time greater than should be processed,
stop processing further
- break;
+ continue;
Review Comment:
Thanks for catching this. It is a mistake I am removing this.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -93,35 +89,26 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile
logFile, Schema readerSc
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader)
throws IOException {
- this(fs, logFile, readerSchema, bufferSize, readBlockLazily,
reverseReader, false,
- HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ this(fs, logFile, readerSchema, bufferSize, false,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader,
boolean enableRecordLookups,
- String keyField) throws IOException {
- this(fs, logFile, readerSchema, bufferSize, readBlockLazily,
reverseReader, enableRecordLookups, keyField,
InternalSchema.getEmptyInternalSchema());
+ boolean enableRecordLookups, String keyField)
throws IOException {
+ this(fs, logFile, readerSchema, bufferSize, enableRecordLookups, keyField,
InternalSchema.getEmptyInternalSchema());
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader,
boolean enableRecordLookups,
- String keyField, InternalSchema internalSchema)
throws IOException {
+ boolean enableRecordLookups, String keyField,
InternalSchema internalSchema) throws IOException {
this.hadoopConf = fs.getConf();
// NOTE: We repackage {@code HoodieLogFile} here to make sure that the
provided path
// is prefixed with an appropriate scheme given that we're not
propagating the FS
// further
this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs,
logFile.getPath()), logFile.getFileSize());
this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
this.readerSchema = readerSchema;
- this.readBlockLazily = readBlockLazily;
- this.reverseReader = reverseReader;
this.enableRecordLookups = enableRecordLookups;
this.keyField = keyField;
this.internalSchema = internalSchema == null ?
InternalSchema.getEmptyInternalSchema() : internalSchema;
- if (this.reverseReader) {
Review Comment:
Please let me know, what do you think?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java:
##########
@@ -210,7 +210,7 @@ protected Map<HoodieLogBlock.HeaderMetadataType, String>
generateHeader(String c
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
metaClient.getActiveTimeline().lastInstant().get().getTimestamp());
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, commit);
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
-
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_PREVIOUS_BLOCK.ordinal()));
+
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
Review Comment:
Yes, this is backward compatible, we are saving ordinal int value in the
disk.
##########
hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java:
##########
@@ -414,7 +411,7 @@ public void testHugeLogFileWrite() throws IOException,
URISyntaxException, Inter
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
byte[] dataBlockContentBytes = getDataBlock(DEFAULT_DATA_BLOCK_TYPE,
records, header).getContentBytes();
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc = new
HoodieLogBlock.HoodieLogBlockContentLocation(new Configuration(), null, 0,
dataBlockContentBytes.length, 0);
- HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null,
Option.ofNullable(dataBlockContentBytes), false,
+ HoodieDataBlock reusableDataBlock = new HoodieAvroDataBlock(null,
Option.ofNullable(dataBlockContentBytes),
Review Comment:
Sure, I will add them.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -93,35 +89,26 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile
logFile, Schema readerSc
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
boolean readBlockLazily, boolean reverseReader)
throws IOException {
- this(fs, logFile, readerSchema, bufferSize, readBlockLazily,
reverseReader, false,
- HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ this(fs, logFile, readerSchema, bufferSize, false,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader,
boolean enableRecordLookups,
- String keyField) throws IOException {
- this(fs, logFile, readerSchema, bufferSize, readBlockLazily,
reverseReader, enableRecordLookups, keyField,
InternalSchema.getEmptyInternalSchema());
+ boolean enableRecordLookups, String keyField)
throws IOException {
+ this(fs, logFile, readerSchema, bufferSize, enableRecordLookups, keyField,
InternalSchema.getEmptyInternalSchema());
}
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
- boolean readBlockLazily, boolean reverseReader,
boolean enableRecordLookups,
- String keyField, InternalSchema internalSchema)
throws IOException {
+ boolean enableRecordLookups, String keyField,
InternalSchema internalSchema) throws IOException {
this.hadoopConf = fs.getConf();
// NOTE: We repackage {@code HoodieLogFile} here to make sure that the
provided path
// is prefixed with an appropriate scheme given that we're not
propagating the FS
// further
this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs,
logFile.getPath()), logFile.getFileSize());
this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
this.readerSchema = readerSchema;
- this.readBlockLazily = readBlockLazily;
- this.reverseReader = reverseReader;
this.enableRecordLookups = enableRecordLookups;
this.keyField = keyField;
this.internalSchema = internalSchema == null ?
InternalSchema.getEmptyInternalSchema() : internalSchema;
- if (this.reverseReader) {
Review Comment:
My understanding is that when iterating in reverse order there is an issue
when we encounter corrupt block. We cannot jump across the corrupt block since
we dont have the block size stored at the end for them. So, we end up ignoring
all the blocks older than the corrupt block.
That is a reason for removing the reverseReader lookup, since it cannot be
handled.
It becomes more complicated when introducing log compaction. There we need
to move the compacted blocks to a different slot. So, it is not straight
forward traversal. So, removing this logic to reduce the complexity involved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]