This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0d7d4c4624b6d0eae6f23326b787e76f4b15dd35 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Sat Jan 28 16:37:22 2023 -0800 [HUDI-5628] Fixing log record reader scan V2 config name (#7764) We introduced a new way to scan log blocks in LogRecordReader and have named it as "hoodie.log.record.reader.use.scanV2". Fixing the config name to be elegant: "hoodie.optimized.log.blocks.scan.enable". Fixing the corresponding Metadata config as well. --- .../hudi/cli/commands/HoodieLogFileCommand.java | 2 +- .../cli/commands/TestHoodieLogFileCommand.java | 2 +- .../apache/hudi/config/HoodieCompactionConfig.java | 11 ++-- .../org/apache/hudi/config/HoodieWriteConfig.java | 4 +- .../metadata/HoodieBackedTableMetadataWriter.java | 2 +- .../action/compact/CompactionExecutionHelper.java | 4 +- .../hudi/table/action/compact/HoodieCompactor.java | 2 +- .../compact/LogCompactionExecutionHelper.java | 2 +- .../HoodieLogCompactionPlanGenerator.java | 2 +- .../MultipleSparkJobExecutionStrategy.java | 2 +- .../TestHoodieClientOnMergeOnReadStorage.java | 6 +- .../hudi/common/config/HoodieMetadataConfig.java | 17 ++--- .../table/log/AbstractHoodieLogRecordReader.java | 10 +-- .../table/log/HoodieMergedLogRecordScanner.java | 12 ++-- .../table/log/HoodieUnMergedLogRecordScanner.java | 12 ++-- .../hudi/metadata/HoodieBackedTableMetadata.java | 2 +- .../metadata/HoodieMetadataLogRecordReader.java | 4 +- .../common/functional/TestHoodieLogFormat.java | 74 +++++++++++----------- .../realtime/RealtimeCompactedRecordReader.java | 2 +- .../reader/DFSHoodieDatasetInputReader.java | 2 +- 20 files changed, 88 insertions(+), 86 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 9a0e485fc9f..075b809e05c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -222,7 +222,7 @@ public class HoodieLogFileCommand { .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) - .withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue())) + .withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())) .build(); for (HoodieRecord hoodieRecord : scanner) { Option<HoodieAvroIndexedRecord> record = hoodieRecord.toIndexedRecord(readerSchema, new Properties()); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java index aff12422f6a..261002c9327 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -233,7 +233,7 @@ public class TestHoodieLogFileCommand extends CLIFunctionalTestHarness { .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) - .withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue())) + .withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())) .build(); Iterator<HoodieRecord> records = scanner.iterator(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index e22bf1e43d1..e37ff3c46bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy; @@ -188,11 +189,11 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Log compaction can be scheduled if the no. of log blocks crosses this threshold value. " + "This is effective only when log compaction is enabled via " + INLINE_LOG_COMPACT.key()); - public static final ConfigProperty<String> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty - .key("hoodie.log.record.reader.use.scanV2") + public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN = ConfigProperty + .key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN) .defaultValue("false") .sinceVersion("0.13.0") - .withDocumentation("ScanV2 logic address all the multiwriter challenges while appending to log files. " + .withDocumentation("New optimized scan for log blocks that handles all multi-writer use-cases while appending to log files. " + "It also differentiates original blocks written by ingestion writers and compacted blocks written log compaction."); /** @deprecated Use {@link #INLINE_COMPACT} and its methods instead */ @@ -432,8 +433,8 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } - public Builder withLogRecordReaderScanV2(String useLogRecordReaderScanV2) { - compactionConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, useLogRecordReaderScanV2); + public Builder withEnableOptimizedLogBlocksScan(String enableOptimizedLogBlocksScan) { + compactionConfig.setValue(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, enableOptimizedLogBlocksScan); return this; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 228cc60c249..8890ddfdeee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1306,8 +1306,8 @@ public class HoodieWriteConfig extends HoodieConfig { return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD); } - public boolean useScanV2ForLogRecordReader() { - return getBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2); + public boolean enableOptimizedLogBlocksScan() { + return getBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN); } public HoodieCleaningPolicy getCleanerPolicy() { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index bbfa4460af4..a8356ff9c71 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -283,7 +283,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta // by default, the HFile does not keep the metadata fields, set up as false // to always use the metadata of the new record. .withPreserveCommitMetadata(false) - .withLogRecordReaderScanV2(String.valueOf(writeConfig.useScanV2ForLogRecordReader())) + .withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan())) .build()) .withParallelism(parallelism, parallelism) .withDeleteParallelism(parallelism) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionHelper.java index f402a673598..bdb83616196 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/CompactionExecutionHelper.java @@ -71,8 +71,8 @@ public class CompactionExecutionHelper<T extends HoodieRecordPayload, I, K, O> i return result; } - protected boolean useScanV2(HoodieWriteConfig writeConfig) { - return writeConfig.useScanV2ForLogRecordReader(); + protected boolean enableOptimizedLogBlockScan(HoodieWriteConfig writeConfig) { + return writeConfig.enableOptimizedLogBlocksScan(); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java index c6a20436c03..0d18a68cbad 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java @@ -199,7 +199,7 @@ public abstract class HoodieCompactor<T, I, K, O> implements Serializable { .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .withOperationField(config.allowOperationMetadataField()) .withPartition(operation.getPartitionPath()) - .withUseScanV2(executionHelper.useScanV2(config)) + .withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config)) .withRecordMerger(config.getRecordMerger()) .build(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java index 0e49267507c..8d2b054d09f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/LogCompactionExecutionHelper.java @@ -80,7 +80,7 @@ public class LogCompactionExecutionHelper<T extends HoodieRecordPayload, I, K, O } @Override - protected boolean useScanV2(HoodieWriteConfig writeConfig) { + protected boolean enableOptimizedLogBlockScan(HoodieWriteConfig writeConfig) { return true; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java index 6a5f160f6b0..e7a77002cc5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java @@ -90,7 +90,7 @@ public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, I, .collect(Collectors.toList())) .withLatestInstantTime(maxInstantTime) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) - .withUseScanV2(true) + .withOptimizedLogBlocksScan(true) .withRecordMerger(writeConfig.getRecordMerger()) .build(); scanner.scan(true); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 3c5c4152112..6e981d2823f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -293,7 +293,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T> .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) .withPartition(clusteringOp.getPartitionPath()) - .withUseScanV2(config.useScanV2ForLogRecordReader()) + .withOptimizedLogBlocksScan(config.enableOptimizedLogBlocksScan()) .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .withRecordMerger(config.getRecordMerger()) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java index 6e2257dcd91..be6a71426b6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java @@ -200,7 +200,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception { HoodieCompactionConfig compactionConfig = HoodieCompactionConfig.newBuilder() .withLogCompactionBlocksThreshold("1") - .withLogRecordReaderScanV2("true") + .withEnableOptimizedLogBlocksScan("true") .build(); HoodieWriteConfig config = getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.INMEMORY).withAutoCommit(true).withCompactionConfig(compactionConfig).build(); @@ -447,7 +447,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { .collect(Collectors.toList())) .withLatestInstantTime(instant) .withBufferSize(config.getMaxDFSStreamBufferSize()) - .withUseScanV2(true) + .withOptimizedLogBlocksScan(true) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); scanner.scan(true); @@ -461,7 +461,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { .collect(Collectors.toList())) .withLatestInstantTime(currentInstant) .withBufferSize(config.getMaxDFSStreamBufferSize()) - .withUseScanV2(true) + .withOptimizedLogBlocksScan(true) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); scanner2.scan(true); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 81f2c1daeff..9b84466090d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -42,6 +42,7 @@ import java.util.Properties; public final class HoodieMetadataConfig extends HoodieConfig { public static final String METADATA_PREFIX = "hoodie.metadata"; + public static final String OPTIMIZED_LOG_BLOCKS_SCAN = ".optimized.log.blocks.scan.enable"; // Enable the internal Metadata Table which saves file listings public static final ConfigProperty<Boolean> ENABLE = ConfigProperty @@ -237,12 +238,12 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "metadata table which are never added before. This config determines how to handle " + "such spurious deletes"); - public static final ConfigProperty<Boolean> USE_LOG_RECORD_READER_SCAN_V2 = ConfigProperty - .key(METADATA_PREFIX + ".log.record.reader.use.scanV2") + public static final ConfigProperty<Boolean> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN = ConfigProperty + .key(METADATA_PREFIX + OPTIMIZED_LOG_BLOCKS_SCAN) .defaultValue(false) .sinceVersion("0.13.0") - .withDocumentation("ScanV2 logic address all the multiwriter challenges while appending to log files. " - + "It also differentiates original blocks written by ingestion writers and compacted blocks written log compaction."); + .withDocumentation("Optimized log blocks scanner that addresses all the multiwriter use-cases while appending to log files. " + + "It also differentiates original blocks written by ingestion writers and compacted blocks written by log compaction."); private HoodieMetadataConfig() { super(); @@ -328,8 +329,8 @@ public final class HoodieMetadataConfig extends HoodieConfig { return getBoolean(IGNORE_SPURIOUS_DELETES); } - public boolean getUseLogRecordReaderScanV2() { - return getBoolean(USE_LOG_RECORD_READER_SCAN_V2); + public boolean doEnableOptimizedLogBlocksScan() { + return getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN); } /** @@ -478,8 +479,8 @@ public final class HoodieMetadataConfig extends HoodieConfig { return this; } - public Builder withLogRecordReaderScanV2(boolean useLogRecordReaderScanV2) { - metadataConfig.setValue(USE_LOG_RECORD_READER_SCAN_V2, String.valueOf(useLogRecordReaderScanV2)); + public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { + metadataConfig.setValue(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, String.valueOf(enableOptimizedLogBlocksScan)); return this; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 83172ecb7ae..42babc775b9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -149,7 +149,7 @@ public abstract class AbstractHoodieLogRecordReader { // Collect all the block instants after scanning all the log files. private final List<String> validBlockInstants = new ArrayList<>(); // Use scanV2 method. - private final boolean useScanV2; + private final boolean enableOptimizedLogBlocksScan; protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, @@ -158,7 +158,7 @@ public abstract class AbstractHoodieLogRecordReader { Option<String> partitionNameOverride, InternalSchema internalSchema, Option<String> keyFieldOverride, - boolean useScanV2, + boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) { this.readerSchema = readerSchema; this.latestInstantTime = latestInstantTime; @@ -184,7 +184,7 @@ public abstract class AbstractHoodieLogRecordReader { this.withOperationField = withOperationField; this.forceFullScan = forceFullScan; this.internalSchema = internalSchema == null ? InternalSchema.getEmptyInternalSchema() : internalSchema; - this.useScanV2 = useScanV2; + this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; if (keyFieldOverride.isPresent()) { // NOTE: This branch specifically is leveraged handling Metadata Table @@ -217,7 +217,7 @@ public abstract class AbstractHoodieLogRecordReader { */ protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) { synchronized (this) { - if (useScanV2) { + if (enableOptimizedLogBlocksScan) { scanInternalV2(keySpecOpt, skipProcessingBlocks); } else { scanInternalV1(keySpecOpt); @@ -894,7 +894,7 @@ public abstract class AbstractHoodieLogRecordReader { throw new UnsupportedOperationException(); } - public Builder withUseScanV2(boolean useScanV2) { + public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { throw new UnsupportedOperationException(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index c41d78b2808..e5ce343eb39 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -98,9 +98,9 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader Option<String> partitionName, InternalSchema internalSchema, Option<String> keyFieldOverride, - boolean useScanV2, HoodieRecordMerger recordMerger) { + boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, - instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, useScanV2, recordMerger); + instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger); try { this.maxMemorySizeInBytes = maxMemorySizeInBytes; // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize @@ -333,7 +333,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader // By default, we're doing a full-scan private boolean forceFullScan = true; // Use scanV2 method. - private boolean useScanV2 = false; + private boolean enableOptimizedLogBlocksScan = false; private HoodieRecordMerger recordMerger; @Override @@ -430,8 +430,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader } @Override - public Builder withUseScanV2(boolean useScanV2) { - this.useScanV2 = useScanV2; + public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { + this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; return this; } @@ -462,7 +462,7 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, bufferSize, spillableMapBasePath, instantRange, diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, forceFullScan, - Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), useScanV2, recordMerger); + Option.ofNullable(partitionName), internalSchema, Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java index 2a7c91641e1..726172e5ee0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java @@ -42,9 +42,9 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, String latestInstantTime, boolean readBlocksLazily, boolean reverseReader, int bufferSize, LogRecordScannerCallback callback, Option<InstantRange> instantRange, InternalSchema internalSchema, - boolean useScanV2, HoodieRecordMerger recordMerger) { + boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange, - false, true, Option.empty(), internalSchema, Option.empty(), useScanV2, recordMerger); + false, true, Option.empty(), internalSchema, Option.empty(), enableOptimizedLogBlocksScan, recordMerger); this.callback = callback; } @@ -105,7 +105,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade private Option<InstantRange> instantRange = Option.empty(); // specific configurations private LogRecordScannerCallback callback; - private boolean useScanV2; + private boolean enableOptimizedLogBlocksScan; private HoodieRecordMerger recordMerger; public Builder withFileSystem(FileSystem fs) { @@ -167,8 +167,8 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade } @Override - public Builder withUseScanV2(boolean useScanV2) { - this.useScanV2 = useScanV2; + public Builder withOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { + this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan; return this; } @@ -184,7 +184,7 @@ public class HoodieUnMergedLogRecordScanner extends AbstractHoodieLogRecordReade return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, callback, instantRange, - internalSchema, useScanV2, recordMerger); + internalSchema, enableOptimizedLogBlocksScan, recordMerger); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index ecb0da8792d..0ab11d65e8b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -531,7 +531,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { .withLogBlockTimestamps(validInstantTimestamps) .enableFullScan(allowFullScan) .withPartition(partitionName) - .withUseScanV2(metadataConfig.getUseLogRecordReaderScanV2()) + .withEnableOptimizedLogBlocksScan(metadataConfig.doEnableOptimizedLogBlocksScan()) .build(); Long logScannerOpenMs = timer.endTimer(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java index 48b9d66f89b..fe92758945f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java @@ -209,8 +209,8 @@ public class HoodieMetadataLogRecordReader implements Closeable { return this; } - public Builder withUseScanV2(boolean useScanV2) { - scannerBuilder.withUseScanV2(useScanV2); + public Builder withEnableOptimizedLogBlocksScan(boolean enableOptimizedLogBlocksScan) { + scannerBuilder.withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan); return this; } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 9250429b377..af7216d3ce2 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -625,7 +625,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Generate 4 delta-log files w/ random records @@ -652,7 +652,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); @@ -672,7 +672,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanV2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Generate 3 delta-log files w/ random records Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -699,7 +699,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanV2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .withForceFullScan(false) .build(); @@ -763,7 +763,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanV2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Generate 3 delta-log files w/ random records Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -790,7 +790,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanV2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .withForceFullScan(false) .build(); @@ -1049,7 +1049,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -1097,7 +1097,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(200, scanner.getTotalLogRecords()); @@ -1117,7 +1117,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -1184,7 +1184,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches"); @@ -1203,7 +1203,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { @MethodSource("testArguments") public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -1276,7 +1276,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records"); @@ -1296,7 +1296,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -1359,7 +1359,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); @@ -1560,7 +1560,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Write a Data block and Delete block with same InstantTime (written in same batch) @@ -1633,7 +1633,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback"); @@ -1651,7 +1651,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Write a Data block and Delete block with same InstantTime (written in same batch) @@ -1707,7 +1707,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); @@ -1721,7 +1721,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -1764,7 +1764,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records"); @@ -1780,7 +1780,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Write a 3 Data blocs with same InstantTime (written in same batch) @@ -1840,7 +1840,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); @@ -1853,7 +1853,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) + boolean enableOptimizedLogBlocksScan) throws IOException, URISyntaxException, InterruptedException { // Write a 3 Data blocs with same InstantTime (written in same batch) @@ -1952,7 +1952,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); @@ -1961,7 +1961,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @ParameterizedTest - @MethodSource("testArgumentsWithoutScanV2Arg") + @MethodSource("testArgumentsWithoutOptimizedScanArg") public void testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogBlock(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily) @@ -2134,7 +2134,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(true) + .withOptimizedLogBlocksScan(true) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); assertEquals(600, scanner.getTotalLogRecords(), "We would read 600 records from scanner"); @@ -2170,7 +2170,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) { + boolean enableOptimizedLogBlocksScan) { try { // Write one Data block with same InstantTime (written in same batch) Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -2224,7 +2224,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withSpillableMapBasePath(spillableBasePath) .withDiskMapType(diskMapType) .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) - .withUseScanV2(useScanv2) + .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); @@ -2241,13 +2241,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) { + boolean enableOptimizedLogBlocksScan) { /* * FIRST_ATTEMPT_FAILED: * Original task from the stage attempt failed, but subsequent stage retry succeeded. */ testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, - diskMapType, isCompressionEnabled, readBlocksLazily, useScanv2); + diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan); } @ParameterizedTest @@ -2255,13 +2255,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) { + boolean enableOptimizedLogBlocksScan) { /* * SECOND_ATTEMPT_FAILED: * Original task from stage attempt succeeded, but subsequent retry attempt failed. */ testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, - diskMapType, isCompressionEnabled, readBlocksLazily, useScanv2); + diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan); } @ParameterizedTest @@ -2269,13 +2269,13 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType diskMapType, boolean isCompressionEnabled, boolean readBlocksLazily, - boolean useScanv2) { + boolean enableOptimizedLogBlocksScan) { /* * BOTH_ATTEMPTS_SUCCEEDED: * Original task from the stage attempt and duplicate task from the stage retry succeeded. */ testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, - diskMapType, isCompressionEnabled, readBlocksLazily, useScanv2); + diskMapType, isCompressionEnabled, readBlocksLazily, enableOptimizedLogBlocksScan); } @ParameterizedTest @@ -2593,7 +2593,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } private static Stream<Arguments> testArguments() { - // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: useScanv2 + // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: enableOptimizedLogBlocksScan return Stream.of( arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false, true), arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false, true), @@ -2614,8 +2614,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { ); } - private static Stream<Arguments> testArgumentsWithoutScanV2Arg() { - // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: useScanv2 + private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() { + // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily, Arg4: enableOptimizedLogBlocksScan return Stream.of( arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false), arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false), diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index 56923063478..cf16bf0bd80 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -100,7 +100,7 @@ public class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader .withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())) .withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) - .withUseScanV2(jobConf.getBoolean(HoodieRealtimeConfig.USE_LOG_RECORD_READER_SCAN_V2, false)) + .withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieRealtimeConfig.USE_LOG_RECORD_READER_SCAN_V2, false)) .withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema())) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 3ed86c32c94..0e3f27e2e3b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -290,7 +290,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue()) .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) - .withUseScanV2(Boolean.parseBoolean(HoodieCompactionConfig.USE_LOG_RECORD_READER_SCAN_V2.defaultValue())) + .withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieCompactionConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())) .withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName())) .build(); // readAvro log files
