This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7964202a751d refactor: Add Lombok Builders (#17781)
7964202a751d is described below
commit 7964202a751dc24d65fa0bcd1f300278a453e6bd
Author: voonhous <[email protected]>
AuthorDate: Tue Jun 2 19:08:47 2026 +0800
refactor: Add Lombok Builders (#17781)
- Add Lombok Builder to HoodieFileGroupReader
- Add Lombok Builder to InputSplit
- Add Lombok Builder to ReaderParameters
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 6 +-
.../org/apache/hudi/index/HoodieIndexUtils.java | 8 +-
.../hudi/io/FileGroupReaderBasedAppendHandle.java | 20 +-
.../hudi/io/FileGroupReaderBasedMergeHandle.java | 6 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 8 +-
.../SecondaryIndexRecordGenerationUtils.java | 10 +-
.../strategy/ClusteringExecutionStrategy.java | 17 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 2 +-
.../client/utils/SparkMetadataWriterUtils.java | 2 +-
.../client/TestSparkRDDMetadataWriteClient.java | 6 +-
.../functional/TestHoodieBackedTableMetadata.java | 2 +-
.../common/table/read/HoodieFileGroupReader.java | 296 +++++++--------------
.../apache/hudi/common/table/read/InputSplit.java | 64 +++--
.../hudi/common/table/read/ReaderParameters.java | 82 ++----
.../buffer/DefaultFileGroupRecordBufferLoader.java | 6 +-
.../read/buffer/LogScanningRecordBufferLoader.java | 2 +-
.../ReusableFileGroupRecordBufferLoader.java | 2 +-
.../StreamingFileGroupRecordBufferLoader.java | 4 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 6 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 8 +-
.../table/read/TestHoodieFileGroupReaderBase.java | 8 +-
.../read/buffer/BaseTestFileGroupRecordBuffer.java | 2 +-
.../buffer/TestFileGroupRecordBufferLoader.java | 4 +-
.../TestSortedKeyBasedFileGroupRecordBuffer.java | 2 +-
.../org/apache/hudi/table/format/FormatUtils.java | 8 +-
.../reader/HoodieFileGroupReaderTestHarness.java | 7 +-
.../HoodieFileGroupReaderBasedRecordReader.java | 6 +-
.../org/apache/hudi/HoodieMergeOnReadRDDV2.scala | 8 +-
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 8 +-
.../HoodieFileGroupReaderBasedFileFormat.scala | 8 +-
.../procedures/PartitionBucketIndexManager.scala | 8 +-
.../TestMetadataUtilRLIandSIRecordGeneration.java | 2 +-
.../hudi/functional/TestHoodieBackedMetadata.java | 2 +-
33 files changed, 279 insertions(+), 351 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 6138e1fb3839..d6d2546e2cb1 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -242,10 +242,12 @@ public class HoodieLogFileCommand {
Option.empty(),
Option.empty(),
fileGroupReaderProperties);
- try (HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ try (HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withDataSchema(readerSchema)
.withRequestedSchema(readerSchema)
.withLatestCommitTime(client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(HoodieInstantTimeGenerator.getCurrentInstantTimeStr()))
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index d9fe1068e421..d2cc530295f2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -319,14 +319,16 @@ public class HoodieIndexUtils {
Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(config.getInternalSchema());
FileSlice fileSlice = fileSliceOption.get();
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
- HoodieFileGroupReader<R> fileGroupReader =
HoodieFileGroupReader.<R>newBuilder()
+ HoodieFileGroupReader<R> fileGroupReader =
HoodieFileGroupReader.<R>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(instantTime.get())
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withDataSchema(dataSchema)
.withRequestedSchema(dataSchema)
- .withInternalSchema(internalSchemaOption)
+ .withInternalSchemaOpt(internalSchemaOption)
.withProps(metaClient.getTableConfig().getProps())
.build();
try {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
index a081709f6fc2..c40ce0158a3c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java
@@ -82,10 +82,20 @@ public class FileGroupReaderBasedAppendHandle<T, I, K, O>
extends HoodieAppendHa
new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
config.getBasePath(), operation.getPartitionPath()),
logFileName)));
// Initializes the record iterator, log compaction requires writing the
deletes into the delete block of the resulting log file.
- try (HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
-
.withLatestCommitTime(instantTime).withPartitionPath(partitionPath).withLogFiles(logFiles).withBaseFileOption(Option.empty()).withDataSchema(writeSchemaWithMetaFields)
-
.withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
-
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
+ try (HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>builder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(hoodieTable.getMetaClient())
+ .withLatestCommitTime(instantTime)
+ .withPartitionPath(partitionPath)
+ .withLogFiles(logFiles)
+ .withBaseFileOption(Option.empty())
+ .withDataSchema(writeSchemaWithMetaFields)
+ .withRequestedSchema(writeSchemaWithMetaFields)
+ .withInternalSchemaOpt(internalSchemaOption)
+ .withProps(props)
+ .withEmitDelete(true)
+ .withShouldUseRecordPosition(usePosition)
+ .withSortOutput(hoodieTable.requireSortedRecords())
// instead of using config.enableOptimizedLogBlocksScan(), we set to
true as log compaction blocks only supported in scanV2
.build()) {
recordItr = new
CloseableMappingIterator<>(fileGroupReader.getLogRecordsOnly(), record -> {
@@ -96,7 +106,7 @@ public class FileGroupReaderBasedAppendHandle<T, I, K, O>
extends HoodieAppendHa
header.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES,
StringUtils.join(fileGroupReader.getValidBlockInstants(), ","));
super.doAppend();
- this.readStats = fileGroupReader.getStats();
+ this.readStats = fileGroupReader.getReadStats();
} catch (IOException e) {
throw new HoodieIOException("Failed to initialize file group reader for
" + fileId, e);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
index d708c15f3384..2893913a0d13 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java
@@ -301,7 +301,7 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O>
extends HoodieWriteMerg
// The stats of inserts, updates, and deletes are updated once at the
end
// These will be set in the write stat when closing the merge handle
- this.readStats = fileGroupReader.getStats();
+ this.readStats = fileGroupReader.getReadStats();
this.insertRecordsWritten = readStats.getNumInserts();
this.updatedRecordsWritten = readStats.getNumUpdates();
this.recordsDeleted = readStats.getNumDeletes();
@@ -318,10 +318,10 @@ public class FileGroupReaderBasedMergeHandle<T, I, K, O>
extends HoodieWriteMerg
private HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition,
Option<InternalSchema> internalSchemaOption, TypedProperties props,
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>>
incomingRecordsItr) {
- HoodieFileGroupReader.Builder<T> fileGroupBuilder =
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
+ HoodieFileGroupReader.HoodieFileGroupReaderBuilder<T> fileGroupBuilder =
HoodieFileGroupReader.<T>builder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge))
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
- .withInternalSchema(internalSchemaOption).withProps(props)
+ .withInternalSchemaOpt(internalSchemaOption).withProps(props)
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
.withFileGroupUpdateCallback(createCallback());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 5cdd20f7eb5c..ce5a4eeb0a2f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -997,14 +997,16 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
HoodieSchema requestedSchema =
metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema()
: HoodieSchemaUtils.projectSchema(dataSchema,
Arrays.asList(metaClient.getTableConfig().getRecordKeyFields().orElse(new
String[0])));
Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(dataWriteConfig.getInternalSchema());
- HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder()
+ HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withLatestCommitTime(instantTime.get())
.withDataSchema(dataSchema)
.withRequestedSchema(requestedSchema)
- .withInternalSchema(internalSchemaOption)
+ .withInternalSchemaOpt(internalSchemaOption)
.withShouldUseRecordPosition(false)
.withProps(metaClient.getTableConfig().getProps())
.build();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
index 9cb9b8ca7df5..b86688b81e60 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
@@ -123,7 +123,7 @@ public class SecondaryIndexRecordGenerationUtils {
// validate that for a given fileId, either we have 1 parquet file or N
log files.
AtomicInteger totalParquetFiles = new AtomicInteger();
AtomicInteger totalLogFiles = new AtomicInteger();
- writeStats.stream().forEach(writeStat -> {
+ writeStats.forEach(writeStat -> {
if (FSUtils.isLogFile(new StoragePath(basePath, writeStat.getPath())))
{
totalLogFiles.getAndIncrement();
} else {
@@ -156,7 +156,7 @@ public class SecondaryIndexRecordGenerationUtils {
} else { // log files are added in current commit
// add new log files to existing latest file slice and compute the
secondary index to primary key mapping.
FileSlice latestFileSlice = fileSliceOption.get();
- writeStats.stream().forEach(writeStat -> {
+ writeStats.forEach(writeStat -> {
StoragePathInfo logFile = new StoragePathInfo(new
StoragePath(basePath, writeStat.getPath()), writeStat.getFileSizeInBytes(),
false, (short) 0, 0, 0);
latestFileSlice.addLogFile(new HoodieLogFile(logFile));
});
@@ -286,9 +286,11 @@ public class SecondaryIndexRecordGenerationUtils {
boolean allowInflightInstants) throws IOException {
String secondaryKeyField = indexDefinition.getSourceFieldsKey();
HoodieSchema requestedSchema =
getRequestedSchemaForSecondaryIndex(metaClient, tableSchema, secondaryKeyField);
- HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder()
+ HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>builder()
.withReaderContext(readerContext)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withHoodieTableMetaClient(metaClient)
.withProps(props)
.withLatestCommitTime(instantTime)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
index d34a85a97bd4..cc3c4a02eb22 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java
@@ -142,9 +142,18 @@ public abstract class ClusteringExecutionStrategy<T, I, K,
O> implements Seriali
ReaderContextFactory<R> readerContextFactory, String instantTime,
TypedProperties properties, boolean usePosition) {
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
- return HoodieFileGroupReader.<R>newBuilder()
-
.withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withLatestCommitTime(instantTime)
-
.withFileSlice(fileSlice).withDataSchema(readerSchema).withRequestedSchema(readerSchema).withInternalSchema(internalSchemaOption)
-
.withShouldUseRecordPosition(usePosition).withProps(properties).build();
+ return HoodieFileGroupReader.<R>builder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(metaClient)
+ .withLatestCommitTime(instantTime)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
+ .withDataSchema(readerSchema)
+ .withRequestedSchema(readerSchema)
+ .withInternalSchemaOpt(internalSchemaOption)
+ .withShouldUseRecordPosition(usePosition)
+ .withProps(properties)
+ .build();
}
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 9563439c39eb..feda056472dc 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -946,7 +946,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.getClassSchema()));
HoodieAvroReaderContext readerContext = new
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(),
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty(),
new TypedProperties());
- HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metadataMetaClient)
.withLogFiles(logFiles.stream())
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 47155899e204..2184c24ad136 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -335,7 +335,7 @@ public class SparkMetadataWriterUtils {
baseFileOption = Option.empty();
logFileStream = Stream.of(new HoodieLogFile(filePath));
}
- HoodieFileGroupReader<InternalRow> fileGroupReader =
HoodieFileGroupReader.<InternalRow>newBuilder()
+ HoodieFileGroupReader<InternalRow> fileGroupReader =
HoodieFileGroupReader.<InternalRow>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withDataSchema(tableSchema)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
index 8a88dd813588..65e45ce96f36 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
@@ -218,10 +218,12 @@ public class TestSparkRDDMetadataWriteClient extends
HoodieClientTestBase {
HoodieSchema schema = HoodieSchemaUtils.addMetadataFields(metadataSchema);
HoodieAvroReaderContext readerContext = new
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(),
metadataMetaClient.getTableConfig(), Option.of(instantRange),
Option.of(predicate));
- HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metadataMetaClient)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withLatestCommitTime(validMetadataInstant)
.withRequestedSchema(metadataSchema)
.withDataSchema(schema)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 6bb71f07de61..fcbd689f7447 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -560,7 +560,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataMergedRecords(HoodieTableMetaClient
metadataMetaClient, List<HoodieLogFile> logFiles, String latestCommitTimestamp,
HoodieWriteConfig metadataTableWriteConfig) {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.getClassSchema()));
HoodieAvroReaderContext readerContext = new
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(),
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty());
- HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metadataMetaClient)
.withLogFiles(logFiles.stream())
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 2b35eaff4f30..0b536b511db1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.BaseFile;
-import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
@@ -35,7 +34,6 @@ import org.apache.hudi.common.table.PartitionPathParser;
import org.apache.hudi.common.table.read.buffer.FileGroupRecordBufferLoader;
import org.apache.hudi.common.table.read.buffer.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -49,6 +47,10 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -67,7 +69,9 @@ import java.util.stream.Stream;
* @param <T> The type of engine-specific record representation, e.g.,{@code
InternalRow}
* in Spark and {@code RowData} in Flink.
*/
+@AllArgsConstructor
public final class HoodieFileGroupReader<T> implements Closeable {
+
private final HoodieReaderContext<T> readerContext;
private final HoodieTableMetaClient metaClient;
private final InputSplit inputSplit;
@@ -81,25 +85,113 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private HoodieFileGroupRecordBuffer<T> recordBuffer;
private ClosableIterator<T> baseFileIterator;
private final Option<UnaryOperator<T>> outputConverter;
+ @Getter
private final HoodieReadStats readStats;
// Callback to run custom logic on updates to the base files for the file
group
private final Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback;
// The list of instant times read from the log blocks, this value is used by
the log-compaction to allow optimized log-block scans
+ @Getter
private List<String> validBlockInstants = Collections.emptyList();
private BufferedRecordConverter<T> bufferedRecordConverter;
- private HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
HoodieStorage storage, String tablePath,
- String latestCommitTime, HoodieSchema
dataSchema, HoodieSchema requestedSchema,
- Option<InternalSchema> internalSchemaOpt,
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
- ReaderParameters readerParameters, InputSplit
inputSplit, Option<BaseFileUpdateCallback<T>> updateCallback,
- FileGroupRecordBufferLoader<T>
recordBufferLoader) {
+ @Builder(setterPrefix = "with")
+ private HoodieFileGroupReader(
+ HoodieReaderContext<T> readerContext,
+ String latestCommitTime,
+ HoodieSchema dataSchema,
+ HoodieSchema requestedSchema,
+ Option<InternalSchema> internalSchemaOpt,
+ HoodieTableMetaClient hoodieTableMetaClient,
+ TypedProperties props,
+ Option<HoodieBaseFile> baseFileOption,
+ Stream<HoodieLogFile> logFiles,
+ String partitionPath,
+ Long start,
+ Long length,
+ Iterator<? extends HoodieRecord> recordIterator,
+ Boolean shouldUseRecordPosition,
+ Boolean allowInflightInstants,
+ Boolean emitDelete,
+ Boolean sortOutput,
+ Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback,
+ FileGroupRecordBufferLoader<T> recordBufferLoader) {
+
+ // Validations
+ ValidationUtils.checkArgument(readerContext != null, "Reader context is
required");
+ ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table
meta client is required");
+ ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit
time is required");
+ ValidationUtils.checkArgument(dataSchema != null, "Data schema is
required");
+ ValidationUtils.checkArgument(requestedSchema != null, "Requested schema
is required");
+ ValidationUtils.checkArgument(props != null, "Props is required");
+ ValidationUtils.checkArgument(partitionPath != null, "Partition path is
required");
+
+ // Handle defaults
+ if (internalSchemaOpt == null) {
+ internalSchemaOpt = Option.empty();
+ }
+ if (baseFileOption == null) {
+ baseFileOption = Option.empty();
+ }
+ if (start == null) {
+ start = 0L;
+ }
+ if (length == null) {
+ length = Long.MAX_VALUE;
+ }
+ if (shouldUseRecordPosition == null) {
+ shouldUseRecordPosition = false;
+ }
+ if (allowInflightInstants == null) {
+ allowInflightInstants = false;
+ }
+ if (emitDelete == null) {
+ emitDelete = false;
+ }
+ if (sortOutput == null) {
+ sortOutput = false;
+ }
+ if (fileGroupUpdateCallback == null) {
+ fileGroupUpdateCallback = Option.empty();
+ }
+
+ // Derive tablePath
+ String tablePath = hoodieTableMetaClient.getBasePath().toString();
+
+ // Set the storage with the readerContext's storage configuration
+ HoodieStorage storage = hoodieTableMetaClient.getStorage().newInstance(new
StoragePath(tablePath), readerContext.getStorageConfiguration());
+
+ // Handle recordBufferLoader default
+ if (recordBufferLoader == null) {
+ if (recordIterator != null) {
+ recordBufferLoader =
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
+ } else {
+ recordBufferLoader = FileGroupRecordBufferLoader.createDefault();
+ }
+ }
+
+ // Build composite objects using static helpers
+ this.readerParameters = ReaderParameters.builder()
+ .shouldUseRecordPosition(shouldUseRecordPosition)
+ .emitDeletes(emitDelete)
+ .sortOutputs(sortOutput)
+ .inflightInstantsAllowed(allowInflightInstants)
+ .build();
+ this.inputSplit = InputSplit.builder()
+ .baseFileOption(baseFileOption)
+ .logFileStream(logFiles)
+ .recordIterator((Iterator<HoodieRecord>) recordIterator)
+ .partitionPath(partitionPath)
+ .start(start)
+ .length(length)
+ .build();
+
+ // Initialize fields
this.readerContext = readerContext;
this.recordBufferLoader = recordBufferLoader;
- this.fileGroupUpdateCallback = updateCallback;
+ this.fileGroupUpdateCallback = fileGroupUpdateCallback;
this.metaClient = hoodieTableMetaClient;
this.storage = storage;
- this.readerParameters = readerParameters;
- this.inputSplit = inputSplit;
+
readerContext.setHasLogFiles(this.inputSplit.hasLogFiles());
readerContext.getRecordContext().setPartitionPath(inputSplit.getPartitionPath());
if (readerContext.getHasLogFiles() && inputSplit.getStart() != 0) {
@@ -112,7 +204,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
readerContext.setTablePath(tablePath);
readerContext.setLatestCommitTime(latestCommitTime);
boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props,
HoodieReaderConfig.MERGE_TYPE,
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
-
readerContext.setShouldMergeUseRecordPosition(readerParameters.useRecordPosition()
&& !isSkipMerge && readerContext.getHasLogFiles() &&
inputSplit.isParquetBaseFile());
+
readerContext.setShouldMergeUseRecordPosition(readerParameters.shouldUseRecordPosition()
&& !isSkipMerge && readerContext.getHasLogFiles() &&
inputSplit.isParquetBaseFile());
readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent());
readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex()
? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, props, metaClient)
@@ -248,13 +340,6 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
}
- /**
- * @return statistics of reading a file group.
- */
- public HoodieReadStats getStats() {
- return readStats;
- }
-
/**
* @return The next record after calling {@link #hasNext}.
*/
@@ -266,10 +351,6 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return nextVal;
}
- public List<String> getValidBlockInstants() {
- return validBlockInstants;
- }
-
/**
* Notifies a write failure with the given record key.
*/
@@ -355,175 +436,4 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
}
}
-
- public static <T> Builder<T> newBuilder() {
- return new Builder<>();
- }
-
- public static class Builder<T> {
- private HoodieReaderContext<T> readerContext;
- private HoodieStorage storage;
- private String tablePath;
- private String latestCommitTime;
- private HoodieSchema dataSchema;
- private HoodieSchema requestedSchema;
- private Option<InternalSchema> internalSchemaOpt = Option.empty();
- private HoodieTableMetaClient hoodieTableMetaClient;
- private TypedProperties props;
- private Option<HoodieBaseFile> baseFileOption;
- private Stream<HoodieLogFile> logFiles;
- private String partitionPath;
- private long start = 0;
- private long length = Long.MAX_VALUE;
- private Iterator<HoodieRecord> recordIterator;
- private boolean shouldUseRecordPosition = false;
- private boolean allowInflightInstants = false;
- private boolean emitDelete;
- private boolean sortOutput = false;
- private Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback =
Option.empty();
- private FileGroupRecordBufferLoader<T> recordBufferLoader;
-
- public Builder<T> withReaderContext(HoodieReaderContext<T> readerContext) {
- this.readerContext = readerContext;
- return this;
- }
-
- public Builder<T> withLatestCommitTime(String latestCommitTime) {
- this.latestCommitTime = latestCommitTime;
- return this;
- }
-
- public Builder<T> withFileSlice(FileSlice fileSlice) {
- this.baseFileOption = fileSlice.getBaseFile();
- this.logFiles = fileSlice.getLogFiles();
- this.partitionPath = fileSlice.getPartitionPath();
- return this;
- }
-
- public Builder<T> withBaseFileOption(Option<HoodieBaseFile>
baseFileOption) {
- this.baseFileOption = baseFileOption;
- return this;
- }
-
- public Builder<T> withLogFiles(Stream<HoodieLogFile> logFiles) {
- this.logFiles = logFiles;
- return this;
- }
-
- public Builder<T> withRecordIterator(Iterator<? extends HoodieRecord>
recordIterator) {
- this.recordIterator = (Iterator<HoodieRecord>) recordIterator;
- this.recordBufferLoader =
FileGroupRecordBufferLoader.createStreamingRecordsBufferLoader();
- return this;
- }
-
- public Builder<T> withPartitionPath(String partitionPath) {
- this.partitionPath = partitionPath;
- return this;
- }
-
- public Builder<T> withDataSchema(HoodieSchema dataSchema) {
- this.dataSchema = dataSchema;
- return this;
- }
-
- public Builder<T> withRequestedSchema(HoodieSchema requestedSchema) {
- this.requestedSchema = requestedSchema;
- return this;
- }
-
- public Builder<T> withInternalSchema(Option<InternalSchema>
internalSchemaOpt) {
- this.internalSchemaOpt = internalSchemaOpt;
- return this;
- }
-
- public Builder<T> withHoodieTableMetaClient(HoodieTableMetaClient
hoodieTableMetaClient) {
- this.hoodieTableMetaClient = hoodieTableMetaClient;
- this.tablePath = hoodieTableMetaClient.getBasePath().toString();
- return this;
- }
-
- public Builder<T> withProps(TypedProperties props) {
- this.props = props;
- return this;
- }
-
- public Builder<T> withStart(long start) {
- this.start = start;
- return this;
- }
-
- public Builder<T> withLength(long length) {
- this.length = length;
- return this;
- }
-
- public Builder<T> withShouldUseRecordPosition(boolean
shouldUseRecordPosition) {
- this.shouldUseRecordPosition = shouldUseRecordPosition;
- return this;
- }
-
- public Builder<T> withAllowInflightInstants(boolean allowInflightInstants)
{
- this.allowInflightInstants = allowInflightInstants;
- return this;
- }
-
- public Builder<T> withEmitDelete(boolean emitDelete) {
- this.emitDelete = emitDelete;
- return this;
- }
-
- public Builder<T>
withFileGroupUpdateCallback(Option<BaseFileUpdateCallback<T>>
fileGroupUpdateCallback) {
- this.fileGroupUpdateCallback = fileGroupUpdateCallback;
- return this;
- }
-
- /**
- * If true, the output of the merge will be sorted instead of appending
log records to end of the iterator if they do not have matching keys in the
base file.
- * This assumes that the base file is already sorted by key.
- * @param sortOutput whether to sort the output iterator
- * @return this builder instance
- */
- public Builder<T> withSortOutput(boolean sortOutput) {
- this.sortOutput = sortOutput;
- return this;
- }
-
- public Builder<T> withRecordBufferLoader(FileGroupRecordBufferLoader<T>
recordBufferLoader) {
- this.recordBufferLoader = recordBufferLoader;
- return this;
- }
-
- public HoodieFileGroupReader<T> build() {
- ValidationUtils.checkArgument(readerContext != null, "Reader context is
required");
- ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie
table meta client is required");
- ValidationUtils.checkArgument(tablePath != null, "Table path is
required");
- // set the storage with the readerContext's storage configuration
- this.storage = hoodieTableMetaClient.getStorage().newInstance(new
StoragePath(tablePath), readerContext.getStorageConfiguration());
-
- ValidationUtils.checkArgument(storage != null, "Storage is required");
- ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit
time is required");
- ValidationUtils.checkArgument(dataSchema != null, "Data schema is
required");
- ValidationUtils.checkArgument(requestedSchema != null, "Requested schema
is required");
- ValidationUtils.checkArgument(props != null, "Props is required");
- ValidationUtils.checkArgument(baseFileOption != null, "Base file option
is required");
- ValidationUtils.checkArgument(partitionPath != null, "Partition path is
required");
-
- if (recordBufferLoader == null) {
- recordBufferLoader = FileGroupRecordBufferLoader.createDefault();
- }
-
- ReaderParameters readerParameters = ReaderParameters.builder()
- .shouldUseRecordPosition(shouldUseRecordPosition)
- .emitDeletes(emitDelete)
- .sortOutputs(sortOutput)
- .allowInflightInstants(allowInflightInstants)
- .build();
- InputSplit inputSplit = new InputSplit(baseFileOption, recordIterator !=
null ? Either.right(recordIterator) : Either.left(logFiles == null ?
Stream.empty() : logFiles),
- partitionPath, start, length);
- return new HoodieFileGroupReader<>(
- readerContext, storage, tablePath, latestCommitTime, dataSchema,
requestedSchema, internalSchemaOpt, hoodieTableMetaClient,
- props, readerParameters, inputSplit, fileGroupUpdateCallback,
recordBufferLoader);
- }
- }
-
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
index 6d7b578e15b3..a0f1a3d04e90 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/InputSplit.java
@@ -24,10 +24,13 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
-import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -38,9 +41,13 @@ import java.util.stream.Stream;
* Represents a split of input data for reading, which includes the partition
path the data belongs to along with an optional base file and a list of log
files.
* If there is only a base file, it is possible for the reader to specify a
particular range of the file with the start and length parameters.
*/
+@Getter
public class InputSplit {
+
private final Option<HoodieBaseFile> baseFileOption;
+ @Getter(AccessLevel.NONE)
private final List<HoodieLogFile> logFiles;
+ @Getter(AccessLevel.NONE)
private final Option<Iterator<HoodieRecord>> recordIterator;
private final String partitionPath;
// Byte offset to start reading from the base file
@@ -48,26 +55,41 @@ public class InputSplit {
// Length of bytes to read from the base file
private final long length;
- InputSplit(Option<HoodieBaseFile> baseFileOption,
- Either<Stream<HoodieLogFile>, Iterator<HoodieRecord>>
recordsToMerge,
- String partitionPath, long start, long length) {
- this.baseFileOption = baseFileOption;
- if (recordsToMerge.isLeft()) {
- this.logFiles =
recordsToMerge.asLeft().sorted(HoodieLogFile.getLogFileComparator())
+ @Builder
+ private InputSplit(
+ Option<HoodieBaseFile> baseFileOption,
+ Stream<HoodieLogFile> logFileStream,
+ Iterator<HoodieRecord> recordIterator,
+ String partitionPath,
+ long start,
+ long length) {
+
+ // Ensure we do not have both sources of data to merge
+ // i.e. logFileStream and recordIterator cannot be both non-null
+ ValidationUtils.checkArgument(!(logFileStream != null && recordIterator !=
null),
+ "Cannot provide both logFileStream and recordIterator");
+
+ this.baseFileOption =
Option.ofNullable(baseFileOption).orElse(Option.empty());
+ this.partitionPath = partitionPath;
+ this.start = start;
+ this.length = length;
+
+ if (logFileStream != null) {
+ // Process Log Files (if provided)
+ this.logFiles = logFileStream
+ .sorted(HoodieLogFile.getLogFileComparator())
.filter(logFile ->
!logFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
.collect(Collectors.toList());
this.recordIterator = Option.empty();
+ } else if (recordIterator != null) {
+ // Process Record Iterator (if provided)
+ this.logFiles = Collections.emptyList();
+ this.recordIterator = Option.of(recordIterator);
} else {
+ // Handle Case with neither (Base file only read)
this.logFiles = Collections.emptyList();
- this.recordIterator = Option.of(recordsToMerge.asRight());
+ this.recordIterator = Option.empty();
}
- this.partitionPath = partitionPath;
- this.start = start;
- this.length = length;
- }
-
- public Option<HoodieBaseFile> getBaseFileOption() {
- return baseFileOption;
}
public List<HoodieLogFile> getLogFiles() {
@@ -79,18 +101,6 @@ public class InputSplit {
return !logFiles.isEmpty();
}
- public String getPartitionPath() {
- return partitionPath;
- }
-
- public long getStart() {
- return start;
- }
-
- public long getLength() {
- return length;
- }
-
public boolean isParquetBaseFile() {
return baseFileOption.map(baseFile ->
HoodieFileFormat.fromFileExtension(baseFile.getStoragePath().getFileExtension())
== HoodieFileFormat.PARQUET).orElse(false);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
index bfb791fe5fa7..d19ad39418b3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ReaderParameters.java
@@ -19,78 +19,38 @@
package org.apache.hudi.common.table.read;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+
/**
* Parameters for how the reader should process the FileGroup while reading.
*/
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@Getter
+@Builder
public class ReaderParameters {
+
// Rely on the position of the record in the file instead of the record keys
while merging data between base and log files
- private final boolean useRecordPosition;
+ @Getter(AccessLevel.NONE)
+ @Builder.Default
+ private final boolean shouldUseRecordPosition = false;
// Whether to emit delete records while reading
- private final boolean emitDelete;
+ @Builder.Default
+ private final boolean emitDeletes = false;
// Whether to sort the output records while reading, this implicitly
requires the base file to be sorted
- private final boolean sortOutput;
+ @Builder.Default
+ private final boolean sortOutputs = false;
// Allows to consider inflight instants while merging log records using
HoodieMergedLogRecordReader
// The inflight instants need to be considered while updating RLI records.
RLI needs to fetch the revived
// and deleted keys from the log files written as part of active data
commit. During the RLI update,
- // the allowInflightInstants flag would need to be set to true. This would
ensure the HoodieMergedLogRecordReader
+ // the inflightInstantsAllowed flag would need to be set to true. This would
ensure the HoodieMergedLogRecordReader
// considers the log records which are inflight.
- private final boolean allowInflightInstants;
-
- private ReaderParameters(boolean useRecordPosition, boolean emitDelete,
boolean sortOutput, boolean allowInflightInstants) {
- this.useRecordPosition = useRecordPosition;
- this.emitDelete = emitDelete;
- this.sortOutput = sortOutput;
- this.allowInflightInstants = allowInflightInstants;
- }
-
- public boolean useRecordPosition() {
- return useRecordPosition;
- }
-
- public boolean emitDeletes() {
- return emitDelete;
- }
-
- public boolean sortOutputs() {
- return sortOutput;
- }
-
- public boolean allowInflightInstants() {
- return allowInflightInstants;
- }
-
- static Builder builder() {
- return new Builder();
- }
-
- static class Builder {
- private boolean shouldUseRecordPosition = false;
- private boolean emitDelete = false;
- private boolean sortOutput = false;
- private boolean allowInflightInstants = false;
-
- public Builder shouldUseRecordPosition(boolean shouldUseRecordPosition) {
- this.shouldUseRecordPosition = shouldUseRecordPosition;
- return this;
- }
-
- public Builder emitDeletes(boolean emitDelete) {
- this.emitDelete = emitDelete;
- return this;
- }
-
- public Builder sortOutputs(boolean sortOutput) {
- this.sortOutput = sortOutput;
- return this;
- }
-
- public Builder allowInflightInstants(boolean allowInflightInstants) {
- this.allowInflightInstants = allowInflightInstants;
- return this;
- }
+ @Builder.Default
+ private final boolean inflightInstantsAllowed = false;
- public ReaderParameters build() {
- return new ReaderParameters(shouldUseRecordPosition, emitDelete,
sortOutput, allowInflightInstants);
- }
+ public boolean shouldUseRecordPosition() {
+ return shouldUseRecordPosition;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
index 3e0609003ad3..e84921c080ae 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/DefaultFileGroupRecordBufferLoader.java
@@ -62,15 +62,15 @@ class DefaultFileGroupRecordBufferLoader<T> extends
LogScanningRecordBufferLoade
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props,
HoodieReaderConfig.MERGE_TYPE,
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
Option<PartialUpdateMode> partialUpdateModeOpt =
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
- UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
+ UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback,
props);
FileGroupRecordBuffer<T> recordBuffer;
if (isSkipMerge) {
recordBuffer = new UnmergedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, readStats);
- } else if (readerParameters.sortOutputs()) {
+ } else if (readerParameters.isSortOutputs()) {
recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
- } else if (readerParameters.useRecordPosition() &&
inputSplit.getBaseFileOption().isPresent()) {
+ } else if (readerParameters.shouldUseRecordPosition() &&
inputSplit.getBaseFileOption().isPresent()) {
recordBuffer = new PositionBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, inputSplit.getBaseFileOption().get().getCommitTime(),
props,
orderingFieldNames, updateProcessor);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
index b6638b1ecd04..1c4f6082e4f1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/LogScanningRecordBufferLoader.java
@@ -48,7 +48,7 @@ abstract class LogScanningRecordBufferLoader {
.withInstantRange(readerContext.getInstantRange())
.withPartition(inputSplit.getPartitionPath())
.withRecordBuffer(recordBuffer)
- .withAllowInflightInstants(readerParameters.allowInflightInstants())
+
.withAllowInflightInstants(readerParameters.isInflightInstantsAllowed())
.withMetaClient(hoodieTableMetaClient)
.build()) {
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
index aa3fbd22cf94..bd3b3ec55bb3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/ReusableFileGroupRecordBufferLoader.java
@@ -58,7 +58,7 @@ public class ReusableFileGroupRecordBufferLoader<T> extends
LogScanningRecordBuf
ReaderParameters readerParameters,
HoodieReadStats readStats,
Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
- UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
+ UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback,
props);
Option<PartialUpdateMode> partialUpdateModeOpt =
hoodieTableMetaClient.getTableConfig().getPartialUpdateMode();
if (cachedResults == null) {
// Create an initial buffer to process the log files
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
index 02c1d5b4cb9c..2087b7dceaf8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/StreamingFileGroupRecordBufferLoader.java
@@ -67,9 +67,9 @@ public class StreamingFileGroupRecordBufferLoader<T>
implements FileGroupRecordB
readerContext.getSchemaHandler().setSchemaForUpdates(recordSchema);
HoodieTableConfig tableConfig = hoodieTableMetaClient.getTableConfig();
Option<PartialUpdateMode> partialUpdateModeOpt =
tableConfig.getPartialUpdateMode();
- UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.emitDeletes(), fileGroupUpdateCallback, props);
+ UpdateProcessor<T> updateProcessor = UpdateProcessor.create(readStats,
readerContext, readerParameters.isEmitDeletes(), fileGroupUpdateCallback,
props);
FileGroupRecordBuffer<T> recordBuffer;
- if (readerParameters.sortOutputs()) {
+ if (readerParameters.isSortOutputs()) {
recordBuffer = new SortedKeyBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, readerContext.getMergeMode(),
partialUpdateModeOpt, props, orderingFieldNames, updateProcessor);
} else {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 39db5be71dcc..a7e53e226033 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -601,11 +601,13 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
baseFileReaders,
fileGroupReaderProps);
- HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metadataMetaClient)
.withLatestCommitTime(latestMetadataInstantTime)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withDataSchema(SCHEMA)
.withRequestedSchema(SCHEMA)
.withProps(fileGroupReaderProps)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 59e2d6dcd83f..0ede3ae5015e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1809,7 +1809,7 @@ public class HoodieTableMetadataUtil {
properties.setProperty(HoodieReaderConfig.MERGE_TYPE.key(),
REALTIME_SKIP_MERGE);
// Currently only avro is fully supported for extracting column ranges
(see HUDI-8585)
HoodieReaderContext readerContext = new
HoodieAvroReaderContext(datasetMetaClient.getStorageConf(),
datasetMetaClient.getTableConfig(), Option.empty(), Option.empty());
- HoodieFileGroupReader fileGroupReader =
HoodieFileGroupReader.newBuilder()
+ HoodieFileGroupReader fileGroupReader = HoodieFileGroupReader.builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(datasetMetaClient)
.withLogFiles(Stream.of(logFile))
@@ -2568,10 +2568,12 @@ public class HoodieTableMetadataUtil {
final String partition = partitionAndBaseFile.getKey();
final FileSlice fileSlice = partitionAndBaseFile.getValue();
if (!fileSlice.getBaseFile().isPresent()) {
- HoodieFileGroupReader fileGroupReader =
HoodieFileGroupReader.<T>newBuilder()
+ HoodieFileGroupReader fileGroupReader =
HoodieFileGroupReader.<T>builder()
.withReaderContext(readerContextFactory.getContext())
.withHoodieTableMetaClient(metaClient)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withDataSchema(tableSchema)
.withRequestedSchema(HoodieSchemaUtils.getRecordKeySchema())
.withLatestCommitTime(latestCommitTime)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index ffcda26a3796..158ef475d843 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -1040,15 +1040,17 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
HoodieSchema
schema,
FileSlice
fileSlice,
int start,
TypedProperties props, boolean sortOutput) {
- return HoodieFileGroupReader.<T>newBuilder()
+ return HoodieFileGroupReader.<T>builder()
.withReaderContext(getHoodieReaderContext(tablePath, schema,
storageConf, metaClient))
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(metaClient.getActiveTimeline().lastInstant().get().requestedTime())
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withDataSchema(schema)
.withRequestedSchema(schema)
.withProps(props)
- .withStart(start)
+ .withStart((long) start)
.withLength(fileSlice.getTotalFileSize())
.withShouldUseRecordPosition(false)
.withAllowInflightInstants(false)
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
index e6ee09b3e6ad..b2869df3310d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/BaseTestFileGroupRecordBuffer.java
@@ -149,7 +149,7 @@ public class BaseTestFileGroupRecordBuffer {
when(inputSplit.hasNoRecordsToMerge()).thenReturn(false);
when(inputSplit.getRecordIterator()).thenReturn(fileGroupRecordBufferItrOpt.get());
ReaderParameters readerParameters = mock(ReaderParameters.class);
- when(readerParameters.sortOutputs()).thenReturn(false);
+ when(readerParameters.isSortOutputs()).thenReturn(false);
return (KeyBasedFileGroupRecordBuffer<IndexedRecord>)
recordBufferLoader.getRecordBuffer(readerContext, mockMetaClient.getStorage(),
inputSplit,
orderingFieldNames, mockMetaClient, props, readerParameters,
readStats, Option.empty()).getKey();
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
index 51dad1c752af..e1c810c56910 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestFileGroupRecordBufferLoader.java
@@ -90,12 +90,12 @@ public class TestFileGroupRecordBufferLoader extends
BaseTestFileGroupRecordBuff
}
ReaderParameters readerParameters = mock(ReaderParameters.class);
if (fileGroupRecordBufferType.contains("Sorted")) {
- when(readerParameters.sortOutputs()).thenReturn(true);
+ when(readerParameters.isSortOutputs()).thenReturn(true);
}
if (fileGroupRecordBufferType.contains("Position")) {
HoodieBaseFile baseFile = mock(HoodieBaseFile.class);
when(inputSplit.getBaseFileOption()).thenReturn(Option.of(baseFile));
- when(readerParameters.useRecordPosition()).thenReturn(true);
+ when(readerParameters.shouldUseRecordPosition()).thenReturn(true);
}
Option<BaseFileUpdateCallback> fileGroupUpdateCallback = Option.empty();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index 0d43c1d463ca..611128a96309 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -141,7 +141,7 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuf
when(inputSplit.hasNoRecordsToMerge()).thenReturn(false);
when(inputSplit.getRecordIterator()).thenReturn(inputRecords.iterator());
ReaderParameters readerParameters = mock(ReaderParameters.class);
- when(readerParameters.sortOutputs()).thenReturn(true);
+ when(readerParameters.isSortOutputs()).thenReturn(true);
SortedKeyBasedFileGroupRecordBuffer fileGroupRecordBuffer =
(SortedKeyBasedFileGroupRecordBuffer<IndexedRecord>) recordBufferLoader
.getRecordBuffer(readerContext, mockMetaClient.getStorage(),
inputSplit, Collections.singletonList("ts"), mockMetaClient, properties,
readerParameters, readStats, Option.empty()).getKey();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index d62763ef64af..0bd7f53b611f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -132,14 +132,16 @@ public class FormatUtils {
final TypedProperties typedProps =
FlinkClientUtil.getReadProps(metaClient.getTableConfig(), writeConfig);
typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
- return HoodieFileGroupReader.<RowData>newBuilder()
+ return HoodieFileGroupReader.<RowData>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(latestInstant)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withDataSchema(tableSchema)
.withRequestedSchema(requiredSchema)
-
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
+
.withInternalSchemaOpt(Option.ofNullable(internalSchemaManager.getQuerySchema()))
.withProps(typedProps)
.withShouldUseRecordPosition(false)
.withEmitDelete(emitDelete)
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
index bead23772f80..1ec6d41d5b87 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestHarness.java
@@ -137,11 +137,14 @@ public class HoodieFileGroupReaderTestHarness extends
HoodieCommonTestHarness {
properties.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME);
properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
"false");
- HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ FileSlice fileSlice = fileSliceOpt.orElseThrow(() -> new
IllegalArgumentException("FileSlice is not present"));
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime("1000") // Not used internally.
- .withFileSlice(fileSliceOpt.orElseThrow(() -> new
IllegalArgumentException("FileSlice is not present")))
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withDataSchema(HOODIE_SCHEMA)
.withRequestedSchema(HOODIE_SCHEMA)
.withProps(properties)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index 1fbb6ad4a982..22d5ba701711 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -145,11 +145,13 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
LOG.debug("Creating HoodieFileGroupReaderRecordReader with
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath,
latestCommitTime, fileSplit.getPath());
FileSlice fileSlice = getFileSliceFromSplit(fileSplit,
getFs(tableBasePath, jobConfCopy), tableBasePath);
this.containsBaseFile = fileSlice.getBaseFile().isPresent();
- this.recordIterator = HoodieFileGroupReader.<ArrayWritable>newBuilder()
+ this.recordIterator = HoodieFileGroupReader.<ArrayWritable>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(latestCommitTime)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile())
+ .withLogFiles(fileSlice.getLogFiles())
+ .withPartitionPath(fileSlice.getPartitionPath())
.withDataSchema(tableSchema)
.withRequestedSchema(requestedSchema)
.withProps(props)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 0104136f8424..0bb4f42931e8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -172,7 +172,7 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
val requestedSchema = requiredSchema.schema
val instantRange =
InstantRange.builder().rangeType(RangeType.EXACT_MATCH).explicitInstants(validInstants.value).build()
val readerContext = new HoodieAvroReaderContext(storageConf,
metaClient.getTableConfig, HOption.of(instantRange),
HOption.empty().asInstanceOf[HOption[HPredicate]])
- val fileGroupReader: HoodieFileGroupReader[IndexedRecord] =
HoodieFileGroupReader.newBuilder()
+ val fileGroupReader: HoodieFileGroupReader[IndexedRecord] =
HoodieFileGroupReader.builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(tableState.latestCommitTimestamp.orNull)
@@ -182,13 +182,13 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
.withProps(properties)
.withDataSchema(tableSchema.schema)
.withRequestedSchema(requestedSchema)
-
.withInternalSchema(HOption.ofNullable(tableSchema.internalSchema.orNull))
+
.withInternalSchemaOpt(HOption.ofNullable(tableSchema.internalSchema.orNull))
.build()
convertAvroToRowIterator(fileGroupReader.getClosableIterator,
requestedSchema)
} else {
val readerContext = new
SparkFileFormatInternalRowReaderContext(fileGroupBaseFileReader.value,
optionalFilters,
Seq.empty, storageConf, metaClient.getTableConfig)
- val fileGroupReader = HoodieFileGroupReader.newBuilder()
+ val fileGroupReader = HoodieFileGroupReader.builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(tableState.latestCommitTimestamp.orNull)
@@ -198,7 +198,7 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
.withProps(properties)
.withDataSchema(tableSchema.schema)
.withRequestedSchema(requiredSchema.schema)
-
.withInternalSchema(HOption.ofNullable(tableSchema.internalSchema.orNull))
+
.withInternalSchemaOpt(HOption.ofNullable(tableSchema.internalSchema.orNull))
.build()
convertCloseableIterator(fileGroupReader.getClosableIterator)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 7a97367bb3c5..4970be3c249b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -513,13 +513,15 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
}
private def loadFileSlice(fileSlice: FileSlice, readerContext:
SparkFileFormatInternalRowReaderContext): Iterator[BufferedRecord[InternalRow]]
= {
- val fileGroupReader = HoodieFileGroupReader.newBuilder()
+ val fileGroupReader = HoodieFileGroupReader.builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile)
+ .withLogFiles(fileSlice.getLogFiles)
+ .withPartitionPath(fileSlice.getPartitionPath)
.withDataSchema(schema)
.withRequestedSchema(schema)
- .withInternalSchema(toJavaOption(originTableSchema.internalSchema))
+ .withInternalSchemaOpt(toJavaOption(originTableSchema.internalSchema))
.withProps(readerProperties)
.withLatestCommitTime(split.changes.last.getInstant)
.build()
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index de4ffb400d4c..da5bef9f785c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -313,14 +313,16 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
} else {
0
}
- val reader = HoodieFileGroupReader.newBuilder()
+ val reader = HoodieFileGroupReader.builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(queryTimestamp)
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile)
+ .withLogFiles(fileSlice.getLogFiles)
+ .withPartitionPath(fileSlice.getPartitionPath)
.withDataSchema(dataSchema)
.withRequestedSchema(requestedSchema)
- .withInternalSchema(internalSchemaOpt)
+ .withInternalSchemaOpt(internalSchemaOpt)
.withProps(props)
.withStart(file.start)
.withLength(baseFileLength)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
index 6eaade123a64..4e044ecb6786 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/PartitionBucketIndexManager.scala
@@ -225,14 +225,16 @@ class PartitionBucketIndexManager extends BaseProcedure
// instantiate other supporting cast
val internalSchemaOption: Option[InternalSchema] = Option.empty()
// instantiate FG reader
- val fileGroupReader = HoodieFileGroupReader.newBuilder()
+ val fileGroupReader = HoodieFileGroupReader.builder()
.withReaderContext(readerContextFactory.getContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(latestInstantTime.requestedTime())
- .withFileSlice(fileSlice)
+ .withBaseFileOption(fileSlice.getBaseFile)
+ .withLogFiles(fileSlice.getLogFiles)
+ .withPartitionPath(fileSlice.getPartitionPath)
.withDataSchema(tableSchemaWithMetaFields)
.withRequestedSchema(tableSchemaWithMetaFields)
- .withInternalSchema(internalSchemaOption) // not support evolution
of schema for now
+ .withInternalSchemaOpt(internalSchemaOption) // not support
evolution of schema for now
.withProps(metaClient.getTableConfig.getProps)
.withShouldUseRecordPosition(false)
.build()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index b35da0450cfd..7dbd7e7aaf96 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -712,7 +712,7 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
TypedProperties properties = new TypedProperties();
// configure un-merged log file reader
HoodieReaderContext readerContext =
context.getReaderContextFactory(metaClient).getContext();
- HoodieFileGroupReader reader = HoodieFileGroupReader.newBuilder()
+ HoodieFileGroupReader reader = HoodieFileGroupReader.builder()
.withReaderContext(readerContext)
.withDataSchema(writerSchemaOpt.get())
.withRequestedSchema(writerSchemaOpt.get())
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index b3beca5fa18c..b0a4aab2e789 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -1664,7 +1664,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
String latestCommitTimestamp) {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.getClassSchema()));
HoodieAvroReaderContext readerContext = new
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(),
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty());
- HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>builder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metadataMetaClient)
.withLogFiles(logFiles.stream())