This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5b741b6b7da2 refactor: migrate to ScanV2Internal API and remove
ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN config (#17520)
5b741b6b7da2 is described below
commit 5b741b6b7da2ff63e6d62abceba49b0108215186
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Feb 3 12:18:29 2026 -0800
refactor: migrate to ScanV2Internal API and remove
ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN config (#17520)
---
.../cli/commands/TestHoodieLogFileCommand.java | 1 -
.../apache/hudi/config/HoodieCompactionConfig.java | 7 -
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 -
.../org/apache/hudi/index/HoodieIndexUtils.java | 1 -
.../hudi/io/FileGroupReaderBasedAppendHandle.java | 2 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 3 +-
.../hudi/metadata/HoodieMetadataWriteUtils.java | 5 +-
.../strategy/ClusteringExecutionStrategy.java | 7 +-
.../GenericRecordValidationTestUtils.java | 2 -
.../hudi/client/TestJavaHoodieBackedMetadata.java | 1 -
.../MultipleSparkJobExecutionStrategy.java | 3 +-
.../client/utils/SparkMetadataWriterUtils.java | 1 -
.../client/TestSparkRDDMetadataWriteClient.java | 1 -
.../functional/TestHoodieBackedTableMetadata.java | 1 -
.../hudi/common/config/HoodieReaderConfig.java | 8 -
.../table/log/AbstractHoodieLogRecordScanner.java | 168 +------------------
.../table/log/BaseHoodieLogRecordReader.java | 186 +--------------------
.../table/log/HoodieLogBlockMetadataScanner.java | 8 +-
.../table/log/HoodieMergedLogRecordReader.java | 12 +-
.../table/log/HoodieMergedLogRecordScanner.java | 13 +-
.../table/log/HoodieUnMergedLogRecordScanner.java | 14 +-
.../common/table/read/HoodieFileGroupReader.java | 12 --
.../hudi/common/table/read/ReaderParameters.java | 16 +-
.../read/buffer/LogScanningRecordBufferLoader.java | 1 -
.../hudi/metadata/HoodieBackedTableMetadata.java | 1 -
.../hudi/metadata/HoodieTableMetadataUtil.java | 24 +--
.../org/apache/hudi/table/format/FormatUtils.java | 1 -
.../common/functional/TestHoodieLogFormat.java | 124 +++++---------
.../HoodieTimestampAwareParquetInputFormat.java | 5 +-
.../realtime/RealtimeCompactedRecordReader.java | 3 -
.../apache/hudi/hadoop/TestHoodieHiveRecord.java | 12 +-
.../reader/DFSHoodieDatasetInputReader.java | 2 -
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 3 -
.../procedures/PartitionBucketIndexManager.scala | 7 +-
.../TestHoodieClientOnMergeOnReadStorage.java | 1 -
.../TestMetadataUtilRLIandSIRecordGeneration.java | 5 +-
.../hudi/functional/TestHoodieBackedMetadata.java | 1 -
37 files changed, 90 insertions(+), 576 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 64f0bcd41a6f..51bd2c2843f3 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -249,7 +249,6 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.build();
Iterator<HoodieRecord> records = scanner.iterator();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
index 79b9c962ebda..5a60ebbcbc76 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
@@ -35,8 +35,6 @@ import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
-import static
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
-
/**
* Compaction related config.
*/
@@ -476,11 +474,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
- public Builder withEnableOptimizedLogBlocksScan(String
enableOptimizedLogBlocksScan) {
- compactionConfig.setValue(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN,
enableOptimizedLogBlocksScan);
- return this;
- }
-
public Builder withCompactionSpecifyPartitionPathRegex(String
partitionPathRegex) {
compactionConfig.setValue(COMPACTION_SPECIFY_PARTITION_PATH_REGEX,
partitionPathRegex);
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index beda2612b275..4789af764bb1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1714,10 +1714,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieCompactionConfig.LOG_COMPACTION_BLOCKS_THRESHOLD);
}
- public boolean enableOptimizedLogBlocksScan() {
- return getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN);
- }
-
public HoodieCleaningPolicy getCleanerPolicy() {
return HoodieCleaningPolicy.valueOf(getString(CLEANER_POLICY));
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 39e7369fd312..c9965a8c20ed 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -361,7 +361,6 @@ public class HoodieIndexUtils {
.withRequestedSchema(dataSchema)
.withInternalSchema(internalSchemaOption)
.withProps(metaClient.getTableConfig().getProps())
-
.withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan())
.build();
try {
final HoodieRecordLocation currentLocation = new
HoodieRecordLocation(fileSlice.getBaseInstantTime(), fileSlice.getFileId());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
index ac8cff79e18c..9f65a5d2f477 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
@@ -88,7 +88,7 @@ public class FileGroupReaderBasedAppendHandle<T, I, K, O>
extends HoodieAppendHa
.withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
// instead of using config.enableOptimizedLogBlocksScan(), we set to
true as log compaction blocks only supported in scanV2
- .withEnableOptimizedLogBlockScan(true).build()) {
+ .build()) {
recordItr = new
CloseableMappingIterator<>(fileGroupReader.getLogRecordsOnly(), record -> {
HoodieRecord<T> hoodieRecord =
readerContext.getRecordContext().constructHoodieRecord(record);
hoodieRecord.setCurrentLocation(newRecordLocation);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 98145b660a86..a78fd998866e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -886,7 +886,6 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
.withInternalSchema(internalSchemaOption)
.withShouldUseRecordPosition(false)
.withProps(metaClient.getTableConfig().getProps())
-
.withEnableOptimizedLogBlockScan(dataWriteConfig.enableOptimizedLogBlocksScan())
.build();
String baseFileInstantTime = fileSlice.getBaseInstantTime();
return new
CloseableMappingIterator<>(fileGroupReader.getClosableIterator(), record -> {
@@ -1497,7 +1496,7 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
dataWriteConfig.getMetadataConfig(),
partitionsToUpdate, dataWriteConfig.getBloomFilterType(),
dataWriteConfig.getBloomIndexParallelism(),
dataWriteConfig.getWritesFileIdEncoding(), getEngineType(),
- Option.of(dataWriteConfig.getRecordMerger().getRecordType()),
dataWriteConfig.enableOptimizedLogBlocksScan());
+ Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
// Updates for record index are created by parsing the WriteStatus which
is a hudi-client object. Hence, we cannot yet move this code
// to the HoodieTableMetadataUtil class in hudi-common.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index b3910def8585..777049a62c46 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -217,7 +217,6 @@ public class HoodieMetadataWriteUtils {
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(writeConfig.getMetadataCompactDeltaCommitMax())
-
.withEnableOptimizedLogBlocksScan(String.valueOf(writeConfig.enableOptimizedLogBlocksScan()))
// Compaction on metadata table is used as a barrier for archiving
on main dataset and for validating the
// deltacommits having corresponding completed commits. Therefore,
we need to compact all fileslices of all
// partitions together requiring UnBoundedCompactionStrategy.
@@ -381,7 +380,7 @@ public class HoodieMetadataWriteUtils {
String instantTime, HoodieTableMetaClient dataMetaClient, HoodieTableMetadata
tableMetadata,
HoodieMetadataConfig metadataConfig, Set<String> enabledPartitionTypes, String
bloomFilterType,
int bloomIndexParallelism, int writesFileIdEncoding, EngineType engineType,
-
Option<HoodieRecord.HoodieRecordType> recordTypeOpt, boolean
enableOptimizeLogBlocksScan) {
+
Option<HoodieRecord.HoodieRecordType> recordTypeOpt) {
final Map<String, HoodieData<HoodieRecord>> partitionToRecordsMap = new
HashMap<>();
final HoodieData<HoodieRecord> filesPartitionRecordsRDD =
context.parallelize(
convertMetadataToFilesPartitionRecords(commitMetadata, instantTime),
1);
@@ -409,7 +408,7 @@ public class HoodieMetadataWriteUtils {
}
if
(enabledPartitionTypes.contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()))
{
partitionToRecordsMap.put(MetadataPartitionType.RECORD_INDEX.getPartitionPath(),
convertMetadataToRecordIndexRecords(context, commitMetadata, metadataConfig,
- dataMetaClient, writesFileIdEncoding, instantTime, engineType,
enableOptimizeLogBlocksScan));
+ dataMetaClient, writesFileIdEncoding, instantTime, engineType));
}
return partitionToRecordsMap;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index bc7fec18a564..d34a85a97bd4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -96,11 +96,10 @@ public abstract class ClusteringExecutionStrategy<T, I, K,
O> implements Seriali
FileSlice fileSlice =
clusteringOperationToFileSlice(table.getMetaClient().getBasePath().toString(),
operation);
final boolean usePosition =
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
- final boolean enableLogBlocksScan =
getWriteConfig().enableOptimizedLogBlocksScan();
Option<InternalSchema> internalSchema =
SerDeHelper.fromJson(getWriteConfig().getInternalSchema());
try {
return getFileGroupReader(table.getMetaClient(), fileSlice,
readerSchemaWithMetaFields, internalSchema,
- readerContextFactory, instantTime, props, usePosition,
enableLogBlocksScan).getClosableHoodieRecordIterator();
+ readerContextFactory, instantTime, props,
usePosition).getClosableHoodieRecordIterator();
} catch (IOException e) {
throw new HoodieClusteringException("Error reading file slices", e);
}
@@ -141,11 +140,11 @@ public abstract class ClusteringExecutionStrategy<T, I,
K, O> implements Seriali
protected static <R> HoodieFileGroupReader<R>
getFileGroupReader(HoodieTableMetaClient metaClient, FileSlice fileSlice,
HoodieSchema readerSchema, Option<InternalSchema> internalSchemaOption,
ReaderContextFactory<R> readerContextFactory, String instantTime,
-
TypedProperties properties, boolean usePosition, boolean enableLogBlocksScan) {
+
TypedProperties properties, boolean usePosition) {
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
return HoodieFileGroupReader.<R>newBuilder()
.withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withLatestCommitTime(instantTime)
.withFileSlice(fileSlice).withDataSchema(readerSchema).withRequestedSchema(readerSchema).withInternalSchema(internalSchemaOption)
-
.withShouldUseRecordPosition(usePosition).withEnableOptimizedLogBlockScan(enableLogBlocksScan).withProps(properties).build();
+
.withShouldUseRecordPosition(usePosition).withProps(properties).build();
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index f880c35594fd..77b146d38506 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -18,7 +18,6 @@
package org.apache.hudi.testutils;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
@@ -113,7 +112,6 @@ public class GenericRecordValidationTestUtils {
.collect(Collectors.toList());
jobConf.set(String.format(HOODIE_CONSUME_COMMIT, config.getTableName()),
instant1);
- jobConf.set(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
"true");
List<GenericRecord> records =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
storageConf, fullPartitionPaths, config.getBasePath(), jobConf, true);
Map<String, GenericRecord> prevRecordsMap = records.stream()
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 3525995fb540..7d174414214a 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -955,7 +955,6 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withRequestedSchema(schema)
.withDataSchema(schema)
.withProps(new TypedProperties())
-
.withEnableOptimizedLogBlockScan(writeConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
.build();
try (ClosableIterator<HoodieRecord<IndexedRecord>> iter =
fileGroupReader.getClosableHoodieRecordIterator()) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index be6d516a303f..9976793199d8 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -300,7 +300,6 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
long maxMemoryPerCompaction =
IOUtils.getMaxMemoryPerCompaction(getEngineContext().getTaskContextSupplier(),
writeConfig);
TypedProperties readerProperties =
getReaderProperties(maxMemoryPerCompaction);
final boolean usePosition =
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
- final boolean enableLogBlocksScan =
getWriteConfig().enableOptimizedLogBlocksScan();
String internalSchemaStr = getWriteConfig().getInternalSchema();
// broadcast reader context.
@@ -317,7 +316,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
// instantiate FG reader
HoodieFileGroupReader<InternalRow> fileGroupReader =
getFileGroupReader(metaClient, fileSlice, tableSchemaWithMetaFields,
internalSchemaOption,
- readerContextFactory, instantTime, readerProperties, usePosition,
enableLogBlocksScan);
+ readerContextFactory, instantTime, readerProperties, usePosition);
// read records from the FG reader
return
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 351512ed0940..47155899e204 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -346,7 +346,6 @@ public class SparkMetadataWriterUtils {
.withBaseFileOption(baseFileOption)
.withLogFiles(logFileStream)
.withPartitionPath(partition)
-
.withEnableOptimizedLogBlockScan(dataWriteConfig.enableOptimizedLogBlocksScan())
.build();
try {
ClosableIterator<InternalRow> rowsForFilePath =
fileGroupReader.getClosableIterator();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
index e2cb75524f58..8a88dd813588 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
@@ -226,7 +226,6 @@ public class TestSparkRDDMetadataWriteClient extends
HoodieClientTestBase {
.withRequestedSchema(metadataSchema)
.withDataSchema(schema)
.withProps(new TypedProperties())
-
.withEnableOptimizedLogBlockScan(hoodieWriteConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
.build();
try (ClosableIterator<HoodieRecord<IndexedRecord>> records =
fileGroupReader.getClosableHoodieRecordIterator()) {
Map<String, HoodieRecord<HoodieMetadataPayload>> actualMdtRecordMap =
new HashMap<>();
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 353d94904ceb..6bb71f07de61 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -570,7 +570,6 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
.withRequestedSchema(schema)
.withDataSchema(schema)
.withProps(new TypedProperties())
-
.withEnableOptimizedLogBlockScan(metadataTableWriteConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
.build();
try (ClosableIterator<HoodieRecord<IndexedRecord>> iter =
fileGroupReader.getClosableHoodieRecordIterator()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 702309874261..1d53286f3747 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -45,14 +45,6 @@ public class HoodieReaderConfig extends HoodieConfig {
.withDocumentation("HoodieLogFormatReader reads a logfile in the forward
direction starting from pos=0 to pos=file_length. "
+ "If this config is set to true, the reader reads the logfile in
reverse direction, from pos=file_length to pos=0");
- public static final ConfigProperty<String> ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN
= ConfigProperty
- .key("hoodie" + HoodieMetadataConfig.OPTIMIZED_LOG_BLOCKS_SCAN)
- .defaultValue("false")
- .markAdvanced()
- .sinceVersion("0.13.0")
- .withDocumentation("New optimized scan for log blocks that handles all
multi-writer use-cases while appending to log files. "
- + "It also differentiates original blocks written by ingestion
writers and compacted blocks written log compaction.");
-
public static final ConfigProperty<Boolean> FILE_GROUP_READER_ENABLED =
ConfigProperty
.key("hoodie.file.group.reader.enabled")
.defaultValue(true)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 4956f0473631..b1eb9e0e2c98 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -143,8 +143,6 @@ public abstract class AbstractHoodieLogRecordScanner {
protected final HoodieRecordType recordType;
// Collect all the block instants after scanning all the log files.
private final List<String> validBlockInstants = new ArrayList<>();
- // Use scanV2 method.
- private final boolean enableOptimizedLogBlocksScan;
// table version for compatibility
private final HoodieTableVersion tableVersion;
// Allows to consider inflight instants while merging log records
@@ -161,7 +159,6 @@ public abstract class AbstractHoodieLogRecordScanner {
Option<String>
partitionNameOverride,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
- boolean
enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
this.readerSchema = readerSchema;
@@ -192,7 +189,6 @@ public abstract class AbstractHoodieLogRecordScanner {
this.withOperationField = withOperationField;
this.forceFullScan = forceFullScan;
this.internalSchema = internalSchema == null ?
InternalSchema.getEmptyInternalSchema() : internalSchema;
- this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
if (keyFieldOverride.isPresent()) {
// NOTE: This branch specifically is leveraged handling Metadata Table
@@ -243,163 +239,7 @@ public abstract class AbstractHoodieLogRecordScanner {
* @param keySpecOpt specifies target set of keys to be scanned
* @param skipProcessingBlocks controls, whether (delta) blocks have to
actually be processed
*/
- protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean
skipProcessingBlocks) {
- synchronized (this) {
- if (enableOptimizedLogBlocksScan) {
- scanInternalV2(keySpecOpt, skipProcessingBlocks);
- } else {
- scanInternalV1(keySpecOpt);
- }
- }
- }
-
- private void scanInternalV1(Option<KeySpec> keySpecOpt) {
- currentInstantLogBlocks = new ArrayDeque<>();
-
- progress = 0.0f;
- totalLogFiles = new AtomicLong(0);
- totalRollbacks = new AtomicLong(0);
- totalCorruptBlocks = new AtomicLong(0);
- totalLogBlocks = new AtomicLong(0);
- totalLogRecords = new AtomicLong(0);
- HoodieLogFormatReader logFormatReaderWrapper = null;
- try {
- // Iterate over the paths
- logFormatReaderWrapper = new HoodieLogFormatReader(storage,
- logFilePaths.stream()
- .map(filePath -> new HoodieLogFile(new StoragePath(filePath)))
- .collect(Collectors.toList()),
- readerSchema, reverseReader, bufferSize, shouldLookupRecords(),
recordKeyField, internalSchema);
-
- Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
- while (logFormatReaderWrapper.hasNext()) {
- HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
- LOG.info("Scanning log file {}", logFile);
- scannedLogFiles.add(logFile);
- totalLogFiles.set(scannedLogFiles.size());
- // Use the HoodieLogFileReader to iterate through the blocks in the
log file
- HoodieLogBlock logBlock = logFormatReaderWrapper.next();
- final String instantTime =
logBlock.getLogBlockHeader().get(INSTANT_TIME);
- totalLogBlocks.incrementAndGet();
- if (logBlock.isDataOrDeleteBlock()) {
- if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) &&
!allowInflightInstants) {
- if
(!getOrCreateCompletedInstantsTimeline().containsOrBeforeTimelineStarts(instantTime)
- ||
getOrCreateInflightInstantsTimeline().containsInstant(instantTime)) {
- // hit an uncommitted block possibly from a failed write, move
to the next one and skip processing this one
- continue;
- }
- }
- if
(compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
GREATER_THAN, this.latestInstantTime)) {
- // Skip processing a data or delete block with the instant time
greater than the latest instant time used by this log record reader
- continue;
- }
- if (instantRange.isPresent() &&
!instantRange.get().isInRange(instantTime)) {
- // filter the log block by instant range
- continue;
- }
- }
- switch (logBlock.getBlockType()) {
- case HFILE_DATA_BLOCK:
- case AVRO_DATA_BLOCK:
- case PARQUET_DATA_BLOCK:
- LOG.info("Reading a data block from file {} at instant {}",
logFile.getPath(), instantTime);
- // store the current block
- currentInstantLogBlocks.push(logBlock);
- break;
- case DELETE_BLOCK:
- LOG.info("Reading a delete block from file {}", logFile.getPath());
- // store deletes so can be rolled back
- currentInstantLogBlocks.push(logBlock);
- break;
- case COMMAND_BLOCK:
- // Consider the following scenario
- // (Time 0, C1, Task T1) -> Running
- // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block
or a correct
- // DataBlock (B1) with commitTime C1
- // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the
attempt number is 2)
- // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock
B2)
- // Now a logFile L1 can have 2 correct Datablocks (B1 and B2)
which are the same.
- // Say, commit C1 eventually failed and a rollback is triggered.
- // Rollback will write only 1 rollback block (R1) since it assumes
one block is
- // written per ingestion batch for a file but in reality we need
to rollback (B1 & B2)
- // The following code ensures the same rollback block (R1) is used
to rollback
- // both B1 & B2
- // This is a command block - take appropriate action based on the
command
- HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
- String targetInstantForCommandBlock =
-
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
- LOG.info("Reading a command block {} with targetInstantTime {}
from file {}", commandBlock.getType(), targetInstantForCommandBlock,
logFile.getPath());
- switch (commandBlock.getType()) { // there can be different types
of command blocks
- case ROLLBACK_BLOCK:
- // Rollback older read log block(s)
- // Get commit time from older record blocks, compare with
targetCommitTime,
- // rollback only if equal, this is required in scenarios of
invalid/extra
- // rollback blocks written due to failures during the rollback
operation itself
- // and ensures the same rollback block (R1) is used to
rollback both B1 & B2 with
- // same instant_time.
- final int instantLogBlockSizeBeforeRollback =
currentInstantLogBlocks.size();
- currentInstantLogBlocks.removeIf(block -> {
- // handle corrupt blocks separately since they may not have
metadata
- if (block.getBlockType() == CORRUPT_BLOCK) {
- LOG.info("Rolling back the last corrupted log block read
in {}", logFile.getPath());
- return true;
- }
- if
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
{
- // rollback older data block or delete block
- LOG.info("Rolling back an older log block read from {}
with instantTime {}", logFile.getPath(), targetInstantForCommandBlock);
- return true;
- }
- return false;
- });
- final int numBlocksRolledBack =
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
- totalRollbacks.addAndGet(numBlocksRolledBack);
- LOG.info("Number of applied rollback blocks {}",
numBlocksRolledBack);
- if (numBlocksRolledBack == 0) {
- LOG.warn("TargetInstantTime {} invalid or extra rollback
command block in {}", targetInstantForCommandBlock, logFile.getPath());
- }
- break;
- default:
- throw new UnsupportedOperationException("Command type not yet
supported.");
- }
- break;
- case CORRUPT_BLOCK:
- LOG.info("Found a corrupt block in {}", logFile.getPath());
- totalCorruptBlocks.incrementAndGet();
- // If there is a corrupt block - we will assume that this was the
next data block
- currentInstantLogBlocks.push(logBlock);
- break;
- default:
- throw new UnsupportedOperationException("Block type not supported
yet");
- }
- }
- // merge the last read block when all the blocks are done reading
- if (!currentInstantLogBlocks.isEmpty()) {
- // if there are no dups, we can take currentInstantLogBlocks as is.
- LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
-
- // Done
- progress = 1.0f;
- } catch (IOException e) {
- LOG.error("Got IOException when reading log file", e);
- throw new HoodieIOException("IOException when reading log file ", e);
- } catch (Exception e) {
- LOG.error("Got exception when reading log file", e);
- throw new HoodieException("Exception when reading log file ", e);
- } finally {
- try {
- if (null != logFormatReaderWrapper) {
- logFormatReaderWrapper.close();
- }
- } catch (IOException ioe) {
- // Eat exception as we do not want to mask the original exception that
can happen
- LOG.error("Unable to close log format reader", ioe);
- }
- }
- }
-
- private void scanInternalV2(Option<KeySpec> keySpecOption, boolean
skipProcessingBlocks) {
+ protected final synchronized void scanInternal(Option<KeySpec> keySpecOpt,
boolean skipProcessingBlocks) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
@@ -593,7 +433,7 @@ public abstract class AbstractHoodieLogRecordScanner {
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
LOG.info("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOption);
+ processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
}
// Done
progress = 1.0f;
@@ -893,10 +733,6 @@ public abstract class AbstractHoodieLogRecordScanner {
throw new UnsupportedOperationException();
}
- public Builder withOptimizedLogBlocksScan(boolean
enableOptimizedLogBlocksScan) {
- throw new UnsupportedOperationException();
- }
-
public Builder withKeyFieldOverride(String keyFieldOverride) {
throw new UnsupportedOperationException();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 3b721c481a00..e242fb2bef50 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -127,8 +127,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
// Record type read from log block
// Collect all the block instants after scanning all the log files.
private final List<String> validBlockInstants = new ArrayList<>();
- // Use scanV2 method.
- private final boolean enableOptimizedLogBlocksScan;
protected HoodieFileGroupRecordBuffer<T> recordBuffer;
// Allows to consider inflight instants while merging log records
protected boolean allowInflightInstants;
@@ -139,7 +137,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
List<HoodieLogFile> logFiles,
boolean reverseReader, int bufferSize,
Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan, Option<String> partitionNameOverride,
- Option<String> keyFieldOverride, boolean
enableOptimizedLogBlocksScan, HoodieFileGroupRecordBuffer<T> recordBuffer,
+ Option<String> keyFieldOverride,
HoodieFileGroupRecordBuffer<T> recordBuffer,
boolean allowInflightInstants) {
this.readerContext = readerContext;
this.readerSchema = readerContext.getSchemaHandler() != null ?
readerContext.getSchemaHandler().getRequiredSchema() : null;
@@ -164,7 +162,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
this.withOperationField = withOperationField;
this.forceFullScan = forceFullScan;
this.internalSchema = readerContext.getSchemaHandler() != null ?
readerContext.getSchemaHandler().getInternalSchema() : null;
- this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
if (keyFieldOverride.isPresent()) {
// NOTE: This branch specifically is leveraged handling Metadata Table
@@ -192,180 +189,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
* @param keySpecOpt specifies target set of keys to be scanned
* @param skipProcessingBlocks controls, whether (delta) blocks have to
actually be processed
*/
- protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean
skipProcessingBlocks) {
- synchronized (this) {
- if (enableOptimizedLogBlocksScan) {
- scanInternalV2(keySpecOpt, skipProcessingBlocks);
- } else {
- scanInternalV1(keySpecOpt);
- }
- }
- }
-
- private void scanInternalV1(Option<KeySpec> keySpecOpt) {
- currentInstantLogBlocks = new ArrayDeque<>();
-
- progress = 0.0f;
- totalLogFiles = new AtomicLong(0);
- totalRollbacks = new AtomicLong(0);
- totalCorruptBlocks = new AtomicLong(0);
- totalLogBlocks = new AtomicLong(0);
- totalLogRecords = new AtomicLong(0);
- HoodieLogFormatReader logFormatReaderWrapper = null;
- try {
- // Iterate over the paths
- logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
- readerSchema, reverseReader, bufferSize, shouldLookupRecords(),
recordKeyField, internalSchema);
-
- Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
- while (logFormatReaderWrapper.hasNext()) {
- HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
- LOG.debug("Scanning log file {}", logFile);
- scannedLogFiles.add(logFile);
- totalLogFiles.set(scannedLogFiles.size());
- // Use the HoodieLogFileReader to iterate through the blocks in the
log file
- HoodieLogBlock logBlock = logFormatReaderWrapper.next();
- final String instantTime =
logBlock.getLogBlockHeader().get(INSTANT_TIME);
- totalLogBlocks.incrementAndGet();
- if (logBlock.isDataOrDeleteBlock()) {
- if (this.tableVersion.lesserThan(HoodieTableVersion.EIGHT) &&
!allowInflightInstants) {
- HoodieTimeline commitsTimeline =
this.hoodieTableMetaClient.getCommitsTimeline();
- if (commitsTimeline.filterInflights().containsInstant(instantTime)
- ||
!commitsTimeline.filterCompletedInstants().containsOrBeforeTimelineStarts(instantTime))
{
- // hit an uncommitted block possibly from a failed write, move
to the next one and skip processing this one
- continue;
- }
- }
- if
(compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
GREATER_THAN, this.latestInstantTime)) {
- // Skip processing a data or delete block with the instant time
greater than the latest instant time used by this log record reader
- continue;
- }
- if (instantRange.isPresent() &&
!instantRange.get().isInRange(instantTime)) {
- // filter the log block by instant range
- continue;
- }
- }
-
- switch (logBlock.getBlockType()) {
- case HFILE_DATA_BLOCK:
- case AVRO_DATA_BLOCK:
- case PARQUET_DATA_BLOCK:
- LOG.debug("Reading a data block from file {} at instant {}",
logFile.getPath(), instantTime);
- // store the current block
- currentInstantLogBlocks.push(logBlock);
- break;
- case DELETE_BLOCK:
- LOG.debug("Reading a delete block from file {}",
logFile.getPath());
- // store deletes so can be rolled back
- currentInstantLogBlocks.push(logBlock);
- break;
- case COMMAND_BLOCK:
- // Consider the following scenario
- // (Time 0, C1, Task T1) -> Running
- // (Time 1, C1, Task T1) -> Failed (Wrote either a corrupt block
or a correct
- // DataBlock (B1) with commitTime C1
- // (Time 2, C1, Task T1.2) -> Running (Task T1 was retried and the
attempt number is 2)
- // (Time 3, C1, Task T1.2) -> Finished (Wrote a correct DataBlock
B2)
- // Now a logFile L1 can have 2 correct Datablocks (B1 and B2)
which are the same.
- // Say, commit C1 eventually failed and a rollback is triggered.
- // Rollback will write only 1 rollback block (R1) since it assumes
one block is
- // written per ingestion batch for a file but in reality we need
to rollback (B1 & B2)
- // The following code ensures the same rollback block (R1) is used
to rollback
- // both B1 & B2
- // This is a command block - take appropriate action based on the
command
- HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
- String targetInstantForCommandBlock =
-
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
- LOG.debug("Reading a command block {} with targetInstantTime {}
from file {}", commandBlock.getType(), targetInstantForCommandBlock,
- logFile.getPath());
- switch (commandBlock.getType()) { // there can be different types
of command blocks
- case ROLLBACK_BLOCK:
- // Rollback older read log block(s)
- // Get commit time from older record blocks, compare with
targetCommitTime,
- // rollback only if equal, this is required in scenarios of
invalid/extra
- // rollback blocks written due to failures during the rollback
operation itself
- // and ensures the same rollback block (R1) is used to
rollback both B1 & B2 with
- // same instant_time.
- final int instantLogBlockSizeBeforeRollback =
currentInstantLogBlocks.size();
- currentInstantLogBlocks.removeIf(block -> {
- // handle corrupt blocks separately since they may not have
metadata
- if (block.getBlockType() == CORRUPT_BLOCK) {
- LOG.debug("Rolling back the last corrupted log block read
in {}", logFile.getPath());
- return true;
- }
- if
(targetInstantForCommandBlock.contentEquals(block.getLogBlockHeader().get(INSTANT_TIME)))
{
- // rollback older data block or delete block
- LOG.debug("Rolling back an older log block read from {}
with instantTime {}",
- logFile.getPath(), targetInstantForCommandBlock);
- return true;
- }
- return false;
- });
-
- final int numBlocksRolledBack =
instantLogBlockSizeBeforeRollback - currentInstantLogBlocks.size();
- totalRollbacks.addAndGet(numBlocksRolledBack);
- LOG.debug("Number of applied rollback blocks {}",
numBlocksRolledBack);
- if (numBlocksRolledBack == 0) {
- LOG.warn("TargetInstantTime {} invalid or extra rollback
command block in {}",
- targetInstantForCommandBlock, logFile.getPath());
- }
- break;
- default:
- throw new UnsupportedOperationException("Command type not yet
supported.");
- }
- break;
- case CORRUPT_BLOCK:
- LOG.debug("Found a corrupt block in {}", logFile.getPath());
- totalCorruptBlocks.incrementAndGet();
- // If there is a corrupt block - we will assume that this was the
next data block
- currentInstantLogBlocks.push(logBlock);
- break;
- default:
- throw new UnsupportedOperationException("Block type not supported
yet");
- }
- }
- // merge the last read block when all the blocks are done reading
- if (!currentInstantLogBlocks.isEmpty()) {
- LOG.debug("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
- }
-
- // Done
- progress = 1.0f;
- totalLogRecords.set(recordBuffer.getTotalLogRecords());
- } catch (IOException e) {
- LOG.error("Got IOException when reading log file", e);
- throw new HoodieIOException("IOException when reading log file ", e);
- } catch (Exception e) {
- LOG.error("Got exception when reading log file", e);
- throw new HoodieException("Exception when reading log file ", e);
- } finally {
- try {
- if (null != logFormatReaderWrapper) {
- logFormatReaderWrapper.close();
- }
- } catch (IOException ioe) {
- // Eat exception as we do not want to mask the original exception that
can happen
- LOG.error("Unable to close log format reader", ioe);
- }
- if (!logFiles.isEmpty()) {
- try {
- StoragePath path = logFiles.get(0).getPath();
- LOG.info("Finished scanning log files. FileId: {},
LogFileInstantTime: {}, "
- + "Total log files: {}, Total log blocks: {}, Total
rollbacks: {}, Total corrupt blocks: {}",
- FSUtils.getFileIdFromLogPath(path),
FSUtils.getDeltaCommitTimeFromLogPath(path),
- totalLogFiles.get(), totalLogBlocks.get(), totalRollbacks.get(),
totalCorruptBlocks.get());
- } catch (Exception e) {
- LOG.warn("Could not extract fileId from log path", e);
- LOG.info("Finished scanning log files. "
- + "Total log files: {}, Total log blocks: {}, Total
rollbacks: {}, Total corrupt blocks: {}",
- totalLogFiles.get(), totalLogBlocks.get(), totalRollbacks.get(),
totalCorruptBlocks.get());
- }
- }
- }
- }
-
- private void scanInternalV2(Option<KeySpec> keySpecOption, boolean
skipProcessingBlocks) {
+ protected final synchronized void scanInternal(Option<KeySpec> keySpecOpt,
boolean skipProcessingBlocks) {
currentInstantLogBlocks = new ArrayDeque<>();
progress = 0.0f;
totalLogFiles = new AtomicLong(0);
@@ -558,7 +382,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
// merge the last read block when all the blocks are done reading
if (!currentInstantLogBlocks.isEmpty() && !skipProcessingBlocks) {
LOG.debug("Merging the final data blocks");
- processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOption);
+ processQueuedBlocksForInstant(currentInstantLogBlocks,
scannedLogFiles.size(), keySpecOpt);
}
// Done
progress = 1.0f;
@@ -709,10 +533,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
throw new UnsupportedOperationException();
}
- public Builder withOptimizedLogBlocksScan(boolean
enableOptimizedLogBlocksScan) {
- throw new UnsupportedOperationException();
- }
-
public abstract BaseHoodieLogRecordReader build();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
index 23d64eed88cb..bfc387be1d24 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
@@ -33,9 +33,11 @@ import java.util.List;
*/
public class HoodieLogBlockMetadataScanner extends
BaseHoodieLogRecordReader<IndexedRecord> {
- public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient,
List<HoodieLogFile> logFiles, int bufferSize, String maxInstantTime,
Option<InstantRange> instantRange) {
- super(getReaderContext(metaClient, maxInstantTime), metaClient,
metaClient.getStorage(), logFiles, false, bufferSize, instantRange, false,
false, Option.empty(), Option.empty(), true,
- null, false);
+ public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient,
List<HoodieLogFile> logFiles,
+ int bufferSize, String maxInstantTime,
Option<InstantRange> instantRange) {
+ super(getReaderContext(metaClient, maxInstantTime), metaClient,
metaClient.getStorage(), logFiles,
+ false, bufferSize, instantRange, false, false, Option.empty(),
+ Option.empty(), null, false);
scanInternal(Option.empty(), true);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 4ca30827fb8b..b3f1eeaa988e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -66,10 +66,10 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient metaClient, HoodieStorage storage,
List<HoodieLogFile> logFiles, boolean
reverseReader,
int bufferSize, Option<InstantRange>
instantRange, boolean withOperationField, boolean forceFullScan,
- Option<String> partitionName,
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
+ Option<String> partitionName,
Option<String> keyFieldOverride,
HoodieFileGroupRecordBuffer<T>
recordBuffer, boolean allowInflightInstants) {
super(readerContext, metaClient, storage, logFiles, reverseReader,
bufferSize, instantRange, withOperationField,
- forceFullScan, partitionName, keyFieldOverride,
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
+ forceFullScan, partitionName, keyFieldOverride, recordBuffer,
allowInflightInstants);
if (forceFullScan) {
performScan();
@@ -228,12 +228,6 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return this;
}
- @Override
- public Builder<T> withOptimizedLogBlocksScan(boolean
enableOptimizedLogBlocksScan) {
- this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
- return this;
- }
-
public Builder<T> withKeyFieldOverride(String keyFieldOverride) {
this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
return this;
@@ -274,7 +268,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
withOperationField, forceFullScan,
Option.ofNullable(partitionName),
Option.ofNullable(keyFieldOverride),
- enableOptimizedLogBlocksScan, recordBuffer,
+ recordBuffer,
allowInflightInstants);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index f6b2a7b51854..151ae92c0ed4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -110,11 +110,11 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
Option<String> partitionName,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
- boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
+ HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient>
hoodieTableMetaClientOption,
boolean allowInflightInstants) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime,
reverseReader, bufferSize,
- instantRange, withOperationField, forceFullScan, partitionName,
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
+ instantRange, withOperationField, forceFullScan, partitionName,
internalSchema, keyFieldOverride, recordMerger,
hoodieTableMetaClientOption);
try {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
@@ -359,7 +359,6 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
private String keyFieldOverride;
// By default, we're doing a full-scan
private boolean forceFullScan = true;
- private boolean enableOptimizedLogBlocksScan = false;
protected boolean allowInflightInstants = false;
private HoodieRecordMerger recordMerger = new
HoodiePreCombineAvroRecordMerger();
protected HoodieTableMetaClient hoodieTableMetaClient;
@@ -457,12 +456,6 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
return this;
}
- @Override
- public Builder withOptimizedLogBlocksScan(boolean
enableOptimizedLogBlocksScan) {
- this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
- return this;
- }
-
@Override
public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
this.recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
@@ -502,7 +495,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
latestInstantTime, maxMemorySizeInBytes, reverseReader,
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField,
forceFullScan,
- Option.ofNullable(partitionName), internalSchema,
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
+ Option.ofNullable(partitionName), internalSchema,
Option.ofNullable(keyFieldOverride), recordMerger,
Option.ofNullable(hoodieTableMetaClient), allowInflightInstants);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 941d11ab94b2..ddf451f09873 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -48,10 +48,9 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
String latestInstantTime, boolean
reverseReader, int bufferSize,
LogRecordScannerCallback callback,
RecordDeletionCallback recordDeletionCallback,
Option<InstantRange> instantRange,
InternalSchema internalSchema,
- boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
- Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
+ HoodieRecordMerger recordMerger,
Option<HoodieTableMetaClient> hoodieTableMetaClientOption) {
super(storage, basePath, logFilePaths, readerSchema, latestInstantTime,
reverseReader, bufferSize, instantRange,
- false, true, Option.empty(), internalSchema, Option.empty(),
enableOptimizedLogBlocksScan, recordMerger,
+ false, true, Option.empty(), internalSchema, Option.empty(),
recordMerger,
hoodieTableMetaClientOption);
this.callback = callback;
this.recordDeletionCallback = recordDeletionCallback;
@@ -126,7 +125,6 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
// specific configurations
private LogRecordScannerCallback callback;
private RecordDeletionCallback recordDeletionCallback;
- private boolean enableOptimizedLogBlocksScan;
private HoodieRecordMerger recordMerger = new
HoodiePreCombineAvroRecordMerger();
private HoodieTableMetaClient hoodieTableMetaClient;
@@ -194,12 +192,6 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
return this;
}
- @Override
- public Builder withOptimizedLogBlocksScan(boolean
enableOptimizedLogBlocksScan) {
- this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
- return this;
- }
-
@Override
public Builder withRecordMerger(HoodieRecordMerger recordMerger) {
this.recordMerger =
HoodieRecordUtils.mergerToPreCombineMode(recordMerger);
@@ -219,7 +211,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
return new HoodieUnMergedLogRecordScanner(storage, basePath,
logFilePaths, readerSchema,
latestInstantTime, reverseReader, bufferSize, callback,
recordDeletionCallback, instantRange,
- internalSchema, enableOptimizedLogBlocksScan, recordMerger,
Option.ofNullable(hoodieTableMetaClient));
+ internalSchema, recordMerger,
Option.ofNullable(hoodieTableMetaClient));
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index d579f3120744..2b35eaff4f30 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -380,7 +380,6 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private boolean allowInflightInstants = false;
private boolean emitDelete;
private boolean sortOutput = false;
- private Boolean enableOptimizedLogBlockScan = false;
private Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback =
Option.empty();
private FileGroupRecordBufferLoader<T> recordBufferLoader;
@@ -478,11 +477,6 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return this;
}
- public Builder<T> withEnableOptimizedLogBlockScan(boolean
enableOptimizedLogBlockScan) {
- this.enableOptimizedLogBlockScan = enableOptimizedLogBlockScan;
- return this;
- }
-
/**
* If true, the output of the merge will be sorted instead of appending
log records to end of the iterator if they do not have matching keys in the
base file.
* This assumes that the base file is already sorted by key.
@@ -513,11 +507,6 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
ValidationUtils.checkArgument(props != null, "Props is required");
ValidationUtils.checkArgument(baseFileOption != null, "Base file option
is required");
ValidationUtils.checkArgument(partitionPath != null, "Partition path is
required");
- if (enableOptimizedLogBlockScan == null) {
- // check to see if props contains this key if not explicitly set
- // otherwise use the default value from the config itself
- enableOptimizedLogBlockScan =
Boolean.valueOf(ConfigUtils.getRawValueWithAltKeys(props,
HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN, true));
- }
if (recordBufferLoader == null) {
recordBufferLoader = FileGroupRecordBufferLoader.createDefault();
@@ -528,7 +517,6 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
.emitDeletes(emitDelete)
.sortOutputs(sortOutput)
.allowInflightInstants(allowInflightInstants)
- .enableOptimizedLogBlockScan(enableOptimizedLogBlockScan)
.build();
InputSplit inputSplit = new InputSplit(baseFileOption, recordIterator !=
null ? Either.right(recordIterator) : Either.left(logFiles == null ?
Stream.empty() : logFiles),
partitionPath, start, length);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
index a12e8b3c28fd..bfb791fe5fa7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
@@ -35,14 +35,12 @@ public class ReaderParameters {
// the allowInflightInstants flag would need to be set to true. This would
ensure the HoodieMergedLogRecordReader
// considers the log records which are inflight.
private final boolean allowInflightInstants;
- private final boolean enableOptimizedLogBlockScan;
- private ReaderParameters(boolean useRecordPosition, boolean emitDelete,
boolean sortOutput, boolean allowInflightInstants, boolean
enableOptimizedLogBlockScan) {
+ private ReaderParameters(boolean useRecordPosition, boolean emitDelete,
boolean sortOutput, boolean allowInflightInstants) {
this.useRecordPosition = useRecordPosition;
this.emitDelete = emitDelete;
this.sortOutput = sortOutput;
this.allowInflightInstants = allowInflightInstants;
- this.enableOptimizedLogBlockScan = enableOptimizedLogBlockScan;
}
public boolean useRecordPosition() {
@@ -61,10 +59,6 @@ public class ReaderParameters {
return allowInflightInstants;
}
- public boolean enableOptimizedLogBlockScan() {
- return enableOptimizedLogBlockScan;
- }
-
static Builder builder() {
return new Builder();
}
@@ -74,7 +68,6 @@ public class ReaderParameters {
private boolean emitDelete = false;
private boolean sortOutput = false;
private boolean allowInflightInstants = false;
- private boolean enableOptimizedLogBlockScan = false;
public Builder shouldUseRecordPosition(boolean shouldUseRecordPosition) {
this.shouldUseRecordPosition = shouldUseRecordPosition;
@@ -96,13 +89,8 @@ public class ReaderParameters {
return this;
}
- public Builder enableOptimizedLogBlockScan(boolean
enableOptimizedLogBlockScan) {
- this.enableOptimizedLogBlockScan = enableOptimizedLogBlockScan;
- return this;
- }
-
public ReaderParameters build() {
- return new ReaderParameters(shouldUseRecordPosition, emitDelete,
sortOutput, allowInflightInstants, enableOptimizedLogBlockScan);
+ return new ReaderParameters(shouldUseRecordPosition, emitDelete,
sortOutput, allowInflightInstants);
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
index 4614b25c2049..6cbaf189ce3b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
@@ -49,7 +49,6 @@ abstract class LogScanningRecordBufferLoader {
.withRecordBuffer(recordBuffer)
.withAllowInflightInstants(readerParameters.allowInflightInstants())
.withMetaClient(hoodieTableMetaClient)
-
.withOptimizedLogBlocksScan(readerParameters.enableOptimizedLogBlockScan())
.build()) {
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 61c34915dcc2..4f0fe0bc7e40 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -548,7 +548,6 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
.withRequestedSchema(SCHEMA)
.withProps(fileGroupReaderProps)
.withRecordBufferLoader(recordBufferLoader)
-
.withEnableOptimizedLogBlockScan(metadataConfig.isOptimizedLogBlocksScanEnabled())
.build();
return fileGroupReader.getClosableIterator();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index f4f2ad9cbf6b..a6b267d19c4c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -868,8 +868,7 @@ public class HoodieTableMetadataUtil {
HoodieTableMetaClient dataTableMetaClient,
int writesFileIdEncoding,
String instantTime,
-
EngineType engineType,
-
boolean enableOptimizeLogBlocksScan) {
+
EngineType engineType) {
List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream).collect(Collectors.toList());
// Return early if there are no write stats, or if the operation is a
compaction.
@@ -934,7 +933,7 @@ public class HoodieTableMetadataUtil {
.collect(Collectors.toList());
// Extract revived and deleted keys
Pair<Set<String>, Set<String>> revivedAndDeletedKeys =
getRevivedAndDeletedKeysFromMergedLogs(dataTableMetaClient, instantTime,
allLogFilePaths, finalWriterSchemaOpt,
- currentLogFilePaths, partitionPath,
readerContextFactory.getContext(), enableOptimizeLogBlocksScan);
+ currentLogFilePaths, partitionPath,
readerContextFactory.getContext());
Set<String> revivedKeys = revivedAndDeletedKeys.getLeft();
Set<String> deletedKeys = revivedAndDeletedKeys.getRight();
// Process revived keys to create updates
@@ -989,7 +988,6 @@ public class HoodieTableMetadataUtil {
* @param logFilePaths list of log file paths including current and
previous file slices
* @param finalWriterSchemaOpt records schema
* @param currentLogFilePaths list of log file paths for the current instant
- * @param enableOptimizedLogBlocksScan - flag used to enable scanInternalV2
for log blocks in data table
* @return pair of revived and deleted keys
*/
@VisibleForTesting
@@ -999,8 +997,7 @@ public class HoodieTableMetadataUtil {
Option<HoodieSchema> finalWriterSchemaOpt,
List<String> currentLogFilePaths,
String partitionPath,
-
HoodieReaderContext<T> readerContext,
-
boolean enableOptimizedLogBlocksScan) {
+
HoodieReaderContext<T> readerContext) {
// Separate out the current log files
List<String> logFilePathsWithoutCurrentLogFiles = logFilePaths.stream()
.filter(logFilePath -> !currentLogFilePaths.contains(logFilePath))
@@ -1008,7 +1005,7 @@ public class HoodieTableMetadataUtil {
if (logFilePathsWithoutCurrentLogFiles.isEmpty()) {
// Only current log file is present, so we can directly get the deleted
record keys from it and return the RLI records.
try (ClosableIterator<BufferedRecord<T>> currentLogRecords =
- getLogRecords(currentLogFilePaths, dataTableMetaClient,
finalWriterSchemaOpt, instantTime, partitionPath, readerContext,
enableOptimizedLogBlocksScan)) {
+ getLogRecords(currentLogFilePaths, dataTableMetaClient,
finalWriterSchemaOpt, instantTime, partitionPath, readerContext)) {
Set<String> deletedKeys = new HashSet<>();
currentLogRecords.forEachRemaining(record -> {
if (record.isDelete()) {
@@ -1019,7 +1016,7 @@ public class HoodieTableMetadataUtil {
}
}
return getRevivedAndDeletedKeys(dataTableMetaClient, instantTime,
partitionPath, readerContext,
- logFilePaths, finalWriterSchemaOpt,
logFilePathsWithoutCurrentLogFiles, enableOptimizedLogBlocksScan);
+ logFilePaths, finalWriterSchemaOpt,
logFilePathsWithoutCurrentLogFiles);
}
private static <T> Pair<Set<String>, Set<String>>
getRevivedAndDeletedKeys(HoodieTableMetaClient dataTableMetaClient,
@@ -1028,14 +1025,13 @@ public class HoodieTableMetadataUtil {
HoodieReaderContext<T> readerContext,
List<String> logFilePaths,
Option<HoodieSchema> finalWriterSchemaOpt,
-
List<String> logFilePathsWithoutCurrentLogFiles,
-
boolean enableOptimizedLogBlocksScan) {
+
List<String> logFilePathsWithoutCurrentLogFiles) {
// Partition valid (non-deleted) and deleted keys from all log files,
including current, in a single pass
Set<String> validKeysForAllLogs = new HashSet<>();
Set<String> deletedKeysForAllLogs = new HashSet<>();
// Fetch log records for all log files
try (ClosableIterator<BufferedRecord<T>> allLogRecords =
- getLogRecords(logFilePaths, dataTableMetaClient,
finalWriterSchemaOpt, instantTime, partitionPath, readerContext,
enableOptimizedLogBlocksScan)) {
+ getLogRecords(logFilePaths, dataTableMetaClient,
finalWriterSchemaOpt, instantTime, partitionPath, readerContext)) {
allLogRecords.forEachRemaining(record -> {
if (record.isDelete()) {
deletedKeysForAllLogs.add(record.getRecordKey());
@@ -1050,7 +1046,7 @@ public class HoodieTableMetadataUtil {
Set<String> deletedKeysForPreviousLogs = new HashSet<>();
// Fetch log records for previous log files (excluding the current log
files)
try (ClosableIterator<BufferedRecord<T>> previousLogRecords =
- getLogRecords(logFilePathsWithoutCurrentLogFiles,
dataTableMetaClient, finalWriterSchemaOpt, instantTime, partitionPath,
readerContext, enableOptimizedLogBlocksScan)) {
+ getLogRecords(logFilePathsWithoutCurrentLogFiles,
dataTableMetaClient, finalWriterSchemaOpt, instantTime, partitionPath,
readerContext)) {
previousLogRecords.forEachRemaining(record -> {
if (record.isDelete()) {
deletedKeysForPreviousLogs.add(record.getRecordKey());
@@ -1068,8 +1064,7 @@ public class HoodieTableMetadataUtil {
Option<HoodieSchema> writerSchemaOpt,
String
latestCommitTimestamp,
String
partitionPath,
-
HoodieReaderContext<T> readerContext,
- boolean
enableOptimizedLogBlocksScan) {
+
HoodieReaderContext<T> readerContext) {
if (writerSchemaOpt.isPresent() && !logFilePaths.isEmpty()) {
List<HoodieLogFile> logFiles =
logFilePaths.stream().map(HoodieLogFile::new).collect(Collectors.toList());
FileSlice fileSlice = new FileSlice(partitionPath,
logFiles.get(0).getFileId(), logFiles.get(0).getDeltaCommitTime());
@@ -1100,7 +1095,6 @@ public class HoodieTableMetadataUtil {
.withMetaClient(datasetMetaClient)
.withAllowInflightInstants(true)
.withRecordBuffer(recordBuffer)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build()) {
// initializes the record buffer with the log records
return recordBuffer.getLogRecordIterator();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 8896945e21f5..797591426d6c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -142,7 +142,6 @@ public class FormatUtils {
.withProps(typedProps)
.withShouldUseRecordPosition(false)
.withEmitDelete(emitDelete)
-
.withEnableOptimizedLogBlockScan(writeConfig.enableOptimizedLogBlocksScan())
.build();
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 753f07350230..cde861f8c5f2 100755
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -696,27 +696,24 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
reader.close();
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testFilteringUncommittedLogBlocksPreTableVersion8(boolean
enableOptimizedLogBlocksScan) throws Exception {
+ @Test
+ public void testFilteringUncommittedLogBlocksPreTableVersion8() throws
Exception {
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(basePath);
metaClient.getTableConfig().setTableVersion(HoodieTableVersion.SIX);
HoodieTableConfig.update(metaClient.getStorage(),
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
-
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType.ROCKS_DB,
true, enableOptimizedLogBlocksScan, true, true);
+
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType.ROCKS_DB,
true, true, true);
}
@ParameterizedTest
@MethodSource("testArguments")
public void
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
- testBasicAppendAndScanMultipleFiles(diskMapType, isCompressionEnabled,
enableOptimizedLogBlocksScan, false, false);
+ testBasicAppendAndScanMultipleFiles(diskMapType, isCompressionEnabled,
false, false);
}
private void
testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
diskMapType,
boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan,
boolean
produceUncommittedLogBlocks,
boolean preTableVersion8)
throws IOException, URISyntaxException, InterruptedException {
@@ -750,7 +747,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build();
List<IndexedRecord> scannedRecords = new ArrayList<>();
@@ -773,8 +769,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void
testBasicAppendAndPartialScanning(ExternalSpillableMap.DiskMapType diskMapType,
- boolean isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
// Generate 3 delta-log files w/ random records
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
@@ -800,7 +795,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withForceFullScan(false)
.build();
@@ -861,8 +855,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void
testBasicAppendAndPartialScanningByKeyPrefixes(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
// Generate 3 delta-log files w/ random records
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
@@ -888,7 +881,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.withForceFullScan(false)
.build();
@@ -1154,8 +1146,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1192,15 +1183,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
Set<String> originalKeys =
copyOfRecords1.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(Collectors.toSet());
- checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
enableOptimizedLogBlocksScan,
+ checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
200, 200, Option.of(originalKeys));
}
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1256,15 +1246,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
Set<String> originalKeys =
copyOfRecords1.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(Collectors.toSet());
- checkLogBlocksAndKeys("102", schema, diskMapType, isCompressionEnabled,
enableOptimizedLogBlocksScan,
+ checkLogBlocksAndKeys("102", schema, diskMapType, isCompressionEnabled,
200, 200, Option.of(originalKeys));
}
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1327,15 +1316,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
Set<String> originalKeys =
copyOfRecords1.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
.collect(Collectors.toSet());
- checkLogBlocksAndKeys("103", schema, diskMapType, isCompressionEnabled,
enableOptimizedLogBlocksScan,
+ checkLogBlocksAndKeys("103", schema, diskMapType, isCompressionEnabled,
200, 200, Option.of(originalKeys));
}
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1402,7 +1390,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200
records");
@@ -1449,7 +1436,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
final List<Boolean> newEmptyPayloads = new ArrayList<>();
@@ -1477,8 +1463,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithCommitBeforeAndAfterRollback(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1561,7 +1546,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
final List<Boolean> newEmptyPayloads = new ArrayList<>();
@@ -1719,8 +1703,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
// Write a Data block and Delete block with same InstantTime (written in
same batch)
@@ -1781,7 +1764,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(commandBlock);
writer.close();
- checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
enableOptimizedLogBlocksScan,
+ checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
0, 0, Option.empty());
FileCreateUtilsLegacy.deleteDeltaCommit(basePath, "100", storage);
}
@@ -1789,8 +1772,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
// Write a Data block and Delete block with same InstantTime (written in
same batch)
@@ -1836,7 +1818,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(commandBlock);
writer.close();
- checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
enableOptimizedLogBlocksScan,
+ checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
0, 0, Option.empty());
FileCreateUtilsLegacy.deleteDeltaCommit(basePath, "100", storage);
}
@@ -1844,8 +1826,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -1873,15 +1854,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(commandBlock);
writer.close();
- checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
enableOptimizedLogBlocksScan,
+ checkLogBlocksAndKeys("100", schema, diskMapType, isCompressionEnabled,
100, 100, Option.empty());
}
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
@@ -1930,15 +1910,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(commandBlock);
writer.close();
- checkLogBlocksAndKeys("101", schema, diskMapType, isCompressionEnabled,
enableOptimizedLogBlocksScan,
+ checkLogBlocksAndKeys("101", schema, diskMapType, isCompressionEnabled,
0, 0, Option.empty());
}
@ParameterizedTest
@MethodSource("testArguments")
void
testLogReaderWithDifferentVersionsOfDeleteBlocks(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan)
+ boolean
isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
@@ -2028,7 +2007,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build()) {
assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200
records");
final List<String> readKeys = new ArrayList<>(200);
@@ -2091,7 +2069,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Should be able to read all 110 records
checkLogBlocksAndKeys("101", schema,
ExternalSpillableMap.DiskMapType.BITCASK, false,
- false, 110, 110, Option.empty());
+ 110, 110, Option.empty());
// Write a rollback for commit 100 which is not the latest commit
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -2102,7 +2080,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Should only be able to read 10 records from commit 101
checkLogBlocksAndKeys("101", schema,
ExternalSpillableMap.DiskMapType.BITCASK, false,
- false, 10, 10, Option.empty());
+ 10, 10, Option.empty());
// Write a rollback for commit 101 which is the latest commit
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "101");
@@ -2113,15 +2091,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.close();
// Should not read any records as both commits are rolled back
- checkLogBlocksAndKeys("101", schema,
ExternalSpillableMap.DiskMapType.BITCASK, false,
+ checkLogBlocksAndKeys("101", schema,
ExternalSpillableMap.DiskMapType.BITCASK,
false, 0, 0, Option.empty());
}
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType
diskMapType,
-
boolean isCompressionEnabled,
-
boolean enableOptimizedLogBlocksScan)
+
boolean isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch)
@@ -2205,12 +2182,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.close();
checkLogBlocksAndKeys("101", schema,
ExternalSpillableMap.DiskMapType.BITCASK, false,
- false, 0, 0, Option.empty());
+ 0, 0, Option.empty());
FileCreateUtilsLegacy.deleteDeltaCommit(basePath, "100", storage);
}
@ParameterizedTest
- @MethodSource("testArgumentsWithoutOptimizedScanArg")
+ @MethodSource("testArguments")
public void
testAvroLogRecordReaderWithMixedInsertsCorruptsRollbackAndMergedLogBlock(ExternalSpillableMap.DiskMapType
diskMapType,
boolean isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException {
@@ -2384,7 +2361,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(true)
.build();
assertEquals(600, scanner.getTotalLogRecords(), "We would read 600 records
from scanner");
final List<String> readKeys = new ArrayList<>();
@@ -2417,8 +2393,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
*/
private void testAvroLogRecordReaderMergingMultipleLogFiles(int
numRecordsInLog1, int numRecordsInLog2,
ExternalSpillableMap.DiskMapType diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan) {
+ boolean
isCompressionEnabled) {
try {
// Write one Data block with same InstantTime (written in same batch)
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
@@ -2471,7 +2446,6 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
.build();
assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2),
scanner.getNumMergedRecordsInLog(),
@@ -2485,40 +2459,37 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan) {
+ boolean
isCompressionEnabled) {
/*
* FIRST_ATTEMPT_FAILED:
* Original task from the stage attempt failed, but subsequent stage retry
succeeded.
*/
testAvroLogRecordReaderMergingMultipleLogFiles(77, 100,
- diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
+ diskMapType, isCompressionEnabled);
}
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType
diskMapType,
-
boolean isCompressionEnabled,
-
boolean enableOptimizedLogBlocksScan) {
+
boolean isCompressionEnabled) {
/*
* SECOND_ATTEMPT_FAILED:
* Original task from stage attempt succeeded, but subsequent retry
attempt failed.
*/
testAvroLogRecordReaderMergingMultipleLogFiles(100, 66,
- diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
+ diskMapType, isCompressionEnabled);
}
@ParameterizedTest
@MethodSource("testArguments")
public void
testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isCompressionEnabled,
- boolean
enableOptimizedLogBlocksScan) {
+ boolean
isCompressionEnabled) {
/*
* BOTH_ATTEMPTS_SUCCEEDED:
* Original task from the stage attempt and duplicate task from the stage
retry succeeded.
*/
testAvroLogRecordReaderMergingMultipleLogFiles(100, 100,
- diskMapType, isCompressionEnabled, enableOptimizedLogBlocksScan);
+ diskMapType, isCompressionEnabled);
}
@Test
@@ -2860,25 +2831,11 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
private static Stream<Arguments> testArguments() {
- // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled,
Arg3: enableOptimizedLogBlocksScan
- return Stream.of(
- arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false),
- arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false),
- arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false),
- arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false),
- arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true),
- arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true),
- arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true),
- arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
- );
- }
-
- private static Stream<Arguments> testArgumentsWithoutOptimizedScanArg() {
// Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled
return Stream.of(
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false),
- arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),
arguments(ExternalSpillableMap.DiskMapType.BITCASK, true),
+ arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true)
);
}
@@ -2941,7 +2898,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
private void checkLogBlocksAndKeys(String latestInstantTime, HoodieSchema
schema, ExternalSpillableMap.DiskMapType diskMapType,
- boolean isCompressionEnabled, boolean
enableOptimizedLogBlocksScan, int expectedTotalRecords,
+ boolean isCompressionEnabled, int
expectedTotalRecords,
int expectedTotalKeys,
Option<Set<String>> expectedKeys) throws IOException {
List<String> allLogFiles =
FSUtils.getAllLogFiles(storage, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
@@ -2958,8 +2915,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableBasePath)
.withDiskMapType(diskMapType)
- .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
- .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan);
+ .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled);
try (HoodieMergedLogRecordScanner scanner = builder.build()) {
assertEquals(expectedTotalRecords, scanner.getTotalLogRecords(), "There
should be " + expectedTotalRecords + " records");
final Set<String> readKeys = new HashSet<>();
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java
index 4c08b346eed6..83d6b15c136b 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieTimestampAwareParquetInputFormat.java
@@ -18,14 +18,15 @@
package org.apache.hudi.hadoop.avro;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.util.ContextUtil;
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 4d64f85e6bf0..8d0d1765eb5e 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -22,7 +22,6 @@ import org.apache.hudi.avro.AvroRecordContext;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
@@ -114,8 +113,6 @@ public class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-
.withOptimizedLogBlocksScan(jobConf.getBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
-
Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())))
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
.build();
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
index e2cab948111c..e5f2a7d573ae 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieHiveRecord.java
@@ -18,18 +18,18 @@
package org.apache.hudi.hadoop;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index 93d8161b42a9..daac532af401 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -297,7 +296,6 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
.withRecordMerger(HoodieRecordUtils.loadRecordMerger(HoodieAvroRecordMerger.class.getName()))
.build();
// readAvro log files
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index eb4f5828fa6a..7a97367bb3c5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -124,9 +124,6 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
readerProps.setProperty(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key,
configuration.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()).toString)
-
readerProps.setProperty(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key,
-
configuration.get(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
- HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
readerProps
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index ffc38ee64dfe..6eaade123a64 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -113,8 +113,7 @@ class PartitionBucketIndexManager extends BaseProcedure
config = config ++ Map(OPERATION.key -> BULK_INSERT_OPERATION_OPT_VAL,
HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE.key ->
WriteOperationType.BUCKET_RESCALE.value(),
- ENABLE_ROW_WRITER.key() -> "true",
- HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key ->
writeClient.getConfig.enableOptimizedLogBlocksScan.toString)
+ ENABLE_ROW_WRITER.key() -> "true")
// Determine which operation to perform
if (showConfig) {
@@ -225,9 +224,6 @@ class PartitionBucketIndexManager extends BaseProcedure
spark.sparkContext.parallelize(allFileSlice,
allFileSlice.size).flatMap(fileSlice => {
// instantiate other supporting cast
val internalSchemaOption: Option[InternalSchema] = Option.empty()
- // get this value from config, which has obtained this from write
client
- val enableOptimizedLogBlockScan =
config.getOrElse(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
-
HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()).toBoolean
// instantiate FG reader
val fileGroupReader = HoodieFileGroupReader.newBuilder()
.withReaderContext(readerContextFactory.getContext)
@@ -239,7 +235,6 @@ class PartitionBucketIndexManager extends BaseProcedure
.withInternalSchema(internalSchemaOption) // not support evolution
of schema for now
.withProps(metaClient.getTableConfig.getProps)
.withShouldUseRecordPosition(false)
- .withEnableOptimizedLogBlockScan(enableOptimizedLogBlockScan)
.build()
val iterator = fileGroupReader.getClosableIterator
CloseableIteratorListener.addListener(iterator)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index 4ee8c34c935d..f24ac473e1d0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -242,7 +242,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
@Test
public void testLogCompactionOnMORTableWithoutBaseFile() throws Exception {
HoodieCompactionConfig compactionConfig =
HoodieCompactionConfig.newBuilder()
- .withEnableOptimizedLogBlocksScan("true")
.withMaxNumDeltaCommitsBeforeCompaction(1)
.withLogCompactionBlocksThreshold(1)
.build();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index 90720d57d8a1..b35da0450cfd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -298,7 +298,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
HoodieCommitMetadata compactionCommitMetadata = (HoodieCommitMetadata)
compactionWriteMetadata.getCommitMetadata().get();
// no RLI records should be generated for compaction operation.
assertTrue(convertMetadataToRecordIndexRecords(context,
compactionCommitMetadata, writeConfig.getMetadataConfig(),
- metaClient, writeConfig.getWritesFileIdEncoding(),
compactionInstantOpt.get(), EngineType.SPARK,
writeConfig.enableOptimizedLogBlocksScan()).isEmpty());
+ metaClient, writeConfig.getWritesFileIdEncoding(),
compactionInstantOpt.get(), EngineType.SPARK).isEmpty());
}
}
@@ -533,7 +533,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
// used for RLI
HoodieReaderContext<?> readerContext =
context.getReaderContextFactory(metaClient).getContext();
finalActualDeletes.addAll(getRevivedAndDeletedKeysFromMergedLogs(metaClient,
latestCommitTimestamp, Collections.singletonList(fullFilePath.toString()),
writerSchemaOpt,
- Collections.singletonList(fullFilePath.toString()),
writeStat.getPartitionPath(), readerContext,
writeConfig.enableOptimizedLogBlocksScan()).getValue());
+ Collections.singletonList(fullFilePath.toString()),
writeStat.getPartitionPath(), readerContext).getValue());
// used in SI flow
actualUpdatesAndDeletes.addAll(getRecordKeys(writeStat.getPartitionPath(),
writeStat.getPrevCommit(), writeStat.getFileId(),
@@ -724,7 +724,6 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
.withHoodieTableMetaClient(datasetMetaClient)
.withProps(properties)
.withEmitDelete(true)
-
.withEnableOptimizedLogBlockScan(writeConfig.enableOptimizedLogBlocksScan())
.build();
Set<String> allRecordKeys = new HashSet<>();
try (ClosableIterator<String> keysIterator =
reader.getClosableKeyIterator()) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 56c1afcd3bdc..5e88aeb8d5e1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -1529,7 +1529,6 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
.withRequestedSchema(schema)
.withDataSchema(schema)
.withProps(new TypedProperties())
-
.withEnableOptimizedLogBlockScan(writeConfig.getMetadataConfig().isOptimizedLogBlocksScanEnabled())
.build();
try (ClosableIterator<HoodieRecord<IndexedRecord>> iter =
fileGroupReader.getClosableHoodieRecordIterator()) {