This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0517519ec08 [HUDI-8289] Unmerged log scanner deprecation (#13383)
0517519ec08 is described below
commit 0517519ec08abc71e15e7be34daa4bd1c240304d
Author: Tim Brown <[email protected]>
AuthorDate: Thu Jun 5 20:11:13 2025 -0500
[HUDI-8289] Unmerged log scanner deprecation (#13383)
---
azure-pipelines-20230430.yml | 16 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 2 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 39 ++--
.../HoodieLogCompactionPlanGenerator.java | 41 ++--
.../FlinkHoodieBackedTableMetadataWriter.java | 2 +-
.../JavaHoodieBackedTableMetadataWriter.java | 2 +-
.../hudi/client/model/HoodieInternalRow.java | 2 +-
.../client/utils/SparkMetadataWriterUtils.java | 132 +++++-------
.../SparkHoodieBackedTableMetadataWriter.java | 10 +-
...ieBackedTableMetadataWriterTableVersionSix.java | 3 +-
.../hudi/BaseSparkInternalRowReaderContext.java | 6 +-
.../table/log/BaseHoodieLogRecordReader.java | 19 +-
.../table/log/HoodieLogBlockMetadataScanner.java | 48 +++++
.../table/log/HoodieMergedLogRecordReader.java | 13 +-
.../common/table/read/HoodieFileGroupReader.java | 3 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 96 ++++-----
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 5 +-
.../realtime/HoodieMergeOnReadSnapshotReader.java | 225 ---------------------
.../TestHoodieMergeOnReadSnapshotReader.java | 187 -----------------
.../java/org/apache/hudi/TestDataSourceUtils.java | 64 ------
.../hudi/testutils/LogFileColStatsTestUtil.java | 39 +---
.../java/org/apache/hudi/TestDataSourceUtils.java | 35 +++-
.../TestHoodieClientOnMergeOnReadStorage.java | 40 ++--
.../TestMetadataUtilRLIandSIRecordGeneration.java | 46 ++++-
.../utilities/HoodieMetadataTableValidator.java | 2 +-
25 files changed, 322 insertions(+), 755 deletions(-)
diff --git a/azure-pipelines-20230430.yml b/azure-pipelines-20230430.yml
index 0d5cbe7a4a5..586c49425f8 100644
--- a/azure-pipelines-20230430.yml
+++ b/azure-pipelines-20230430.yml
@@ -128,7 +128,7 @@ stages:
jobs:
- job: UT_FT_1
displayName: UT client/spark-client
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
steps:
- task: Maven@4
displayName: maven install
@@ -242,7 +242,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_4
displayName: UT spark-datasource Java Test 2
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
steps:
- task: Maven@4
displayName: maven install
@@ -277,7 +277,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_5
displayName: UT spark-datasource DML
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
steps:
- task: Maven@4
displayName: maven install
@@ -312,7 +312,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_6
displayName: UT spark-datasource DDL & Others
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
steps:
- task: Maven@4
displayName: maven install
@@ -347,7 +347,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_7
displayName: UT Hudi Streamer & FT utilities
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
steps:
- task: Docker@2
displayName: "login to docker hub"
@@ -396,7 +396,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_8
displayName: UT hudi-hadoop-common & Hudi Utilities others
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
steps:
- task: Docker@2
displayName: "login to docker hub"
@@ -445,7 +445,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_9
displayName: FT spark 2
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
steps:
- task: Maven@4
displayName: maven install
@@ -490,7 +490,7 @@ stages:
displayName: Top 100 long-running testcases
- job: UT_FT_10
displayName: UT FT common & other modules
- timeoutInMinutes: '75'
+ timeoutInMinutes: '90'
steps:
- task: Docker@2
displayName: "login to docker hub"
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 8a75f1056fc..764843b7877 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -432,7 +432,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
.map(fieldName ->
HoodieAvroUtils.getSchemaForField(writeSchemaWithMetaFields,
fieldName)).collect(Collectors.toList());
try {
Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
- collectColumnRangeMetadata(recordList, fieldsToIndex,
stat.getPath(), writeSchemaWithMetaFields, storage.getConf());
+ collectColumnRangeMetadata(recordList.iterator(), fieldsToIndex,
stat.getPath(), writeSchemaWithMetaFields, storage.getConf());
stat.putRecordsStats(columnRangeMetadataMap);
} catch (HoodieException e) {
throw new HoodieAppendException("Failed to extract append result", e);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 31c164818d8..f5fd9055145 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -435,6 +435,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
String partitionName;
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair;
List<String> columnsToIndex = new ArrayList<>();
+ Lazy<Option<Schema>> tableSchema = Lazy.lazily(() ->
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
try {
switch (partitionType) {
case FILES:
@@ -446,7 +447,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
partitionName = BLOOM_FILTERS.getPartitionPath();
break;
case COLUMN_STATS:
- Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>>
colStatsColumnsAndRecord =
initializeColumnStatsPartition(partitionIdToAllFilesMap);
+ Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>>
colStatsColumnsAndRecord =
initializeColumnStatsPartition(partitionIdToAllFilesMap, tableSchema);
columnsToIndex = colStatsColumnsAndRecord.getKey();
fileGroupCountAndRecordsPair = colStatsColumnsAndRecord.getValue();
partitionName = COLUMN_STATS.getPartitionPath();
@@ -464,7 +465,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
continue;
}
partitionName = expressionIndexPartitionsToInit.iterator().next();
- fileGroupCountAndRecordsPair =
initializeExpressionIndexPartition(partitionName, dataTableInstantTime,
lazyLatestMergedPartitionFileSliceList);
+ fileGroupCountAndRecordsPair =
initializeExpressionIndexPartition(partitionName, dataTableInstantTime,
lazyLatestMergedPartitionFileSliceList, tableSchema);
break;
case PARTITION_STATS:
// For PARTITION_STATS, COLUMN_STATS should also be enabled
@@ -473,7 +474,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
continue;
}
- fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList);
+ fileGroupCountAndRecordsPair =
initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList,
tableSchema);
partitionName = PARTITION_STATS.getPartitionPath();
break;
case SECONDARY_INDEX:
@@ -558,21 +559,22 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
}
private Pair<Integer, HoodieData<HoodieRecord>>
initializePartitionStatsIndex(
- Lazy<List<Pair<String, FileSlice>>>
lazyLatestMergedPartitionFileSliceList) {
+ Lazy<List<Pair<String, FileSlice>>>
lazyLatestMergedPartitionFileSliceList,
+ Lazy<Option<Schema>> tableSchemaOpt) {
HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
engineContext, lazyLatestMergedPartitionFileSliceList.get(),
dataWriteConfig.getMetadataConfig(),
- dataMetaClient, Option.empty(),
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
+ dataMetaClient, tableSchemaOpt,
Option.of(dataWriteConfig.getRecordMerger().getRecordType()));
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
}
- private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionIdToAllFilesMap) {
+ private Pair<List<String>, Pair<Integer, HoodieData<HoodieRecord>>>
initializeColumnStatsPartition(Map<String, Map<String, Long>>
partitionIdToAllFilesMap,
+
Lazy<Option<Schema>> tableSchema) {
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
if (partitionIdToAllFilesMap.isEmpty()) {
return Pair.of(Collections.emptyList(), Pair.of(fileGroupCount,
engineContext.emptyHoodieData()));
}
// Find the columns to index
- Lazy<Option<Schema>> tableSchema = Lazy.lazily(() ->
HoodieTableMetadataUtil.tryResolveSchemaForTable(dataMetaClient));
final List<String> columnsToIndex = new
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(dataMetaClient.getTableConfig(),
dataWriteConfig.getMetadataConfig(), tableSchema, true,
Option.of(dataWriteConfig.getRecordMerger().getRecordType())).keySet());
@@ -610,6 +612,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
* @param indexDefinition Hoodie Index Definition for the
expression index for which records need to be generated
* @param metaClient Hoodie Table Meta Client
* @param parallelism Parallelism to use for engine
operations
+ * @param tableSchema Schema of the table
* @param readerSchema Schema of reader
* @param storageConf Storage Config
* @param instantTime Instant time
@@ -618,14 +621,15 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
protected abstract HoodieData<HoodieRecord>
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet,
HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient,
- int
parallelism, Schema readerSchema,
+ int
parallelism, Schema tableSchema, Schema readerSchema,
StorageConfiguration<?> storageConf,
String
instantTime);
protected abstract EngineType getEngineType();
private Pair<Integer, HoodieData<HoodieRecord>>
initializeExpressionIndexPartition(
- String indexName, String dataTableInstantTime, Lazy<List<Pair<String,
FileSlice>>> lazyLatestMergedPartitionFileSliceList) throws Exception {
+ String indexName, String dataTableInstantTime, Lazy<List<Pair<String,
FileSlice>>> lazyLatestMergedPartitionFileSliceList,
+ Lazy<Option<Schema>> tableSchemaOpt) {
HoodieIndexDefinition indexDefinition = getIndexDefinition(indexName);
ValidationUtils.checkState(indexDefinition != null, "Expression Index
definition is not present for index " + indexName);
List<Pair<String, FileSlice>> partitionFileSlicePairs =
lazyLatestMergedPartitionFileSliceList.get();
@@ -634,19 +638,18 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
if (entry.getValue().getBaseFile().isPresent()) {
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(),
Pair.of(entry.getValue().getBaseFile().get().getPath(),
entry.getValue().getBaseFile().get().getFileLen())));
}
- entry.getValue().getLogFiles().forEach(hoodieLogFile -> {
- if (entry.getValue().getLogFiles().count() > 0) {
- entry.getValue().getLogFiles().forEach(logfile -> {
- partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(),
Pair.of(logfile.getPath().toString(), logfile.getFileSize())));
- });
- }
- });
+ entry.getValue().getLogFiles()
+ .forEach(hoodieLogFile ->
partitionFilePathSizeTriplet.add(Pair.of(entry.getKey(),
Pair.of(hoodieLogFile.getPath().toString(), hoodieLogFile.getFileSize()))));
});
int fileGroupCount =
dataWriteConfig.getMetadataConfig().getExpressionIndexFileGroupCount();
+ if (partitionFileSlicePairs.isEmpty()) {
+ return Pair.of(fileGroupCount, engineContext.emptyHoodieData());
+ }
int parallelism = Math.min(partitionFilePathSizeTriplet.size(),
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
- Schema readerSchema =
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
- return Pair.of(fileGroupCount,
getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition,
dataMetaClient, parallelism, readerSchema, storageConf, dataTableInstantTime));
+ Schema tableSchema = tableSchemaOpt.get().orElseThrow(() -> new
HoodieMetadataException("Table schema is not available for expression index
initialization"));
+ Schema readerSchema =
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient,
tableSchema);
+ return Pair.of(fileGroupCount,
getExpressionIndexRecords(partitionFilePathSizeTriplet, indexDefinition,
dataMetaClient, parallelism, tableSchema, readerSchema, storageConf,
dataTableInstantTime));
}
HoodieIndexDefinition getIndexDefinition(String indexName) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 5d9337ac4e7..805d88fc840 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieLogBlockMetadataScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
@@ -78,8 +78,7 @@ public class HoodieLogCompactionPlanGenerator<T extends
HoodieRecordPayload, I,
@Override
protected boolean filterFileSlice(FileSlice fileSlice, String
lastCompletedInstantTime,
Set<HoodieFileGroupId>
pendingFileGroupIds, Option<InstantRange> instantRange) {
- return isFileSliceEligibleForLogCompaction(fileSlice,
lastCompletedInstantTime, instantRange)
- && super.filterFileSlice(fileSlice, lastCompletedInstantTime,
pendingFileGroupIds, instantRange);
+ return super.filterFileSlice(fileSlice, lastCompletedInstantTime,
pendingFileGroupIds, instantRange) &&
isFileSliceEligibleForLogCompaction(fileSlice, lastCompletedInstantTime,
instantRange);
}
@Override
@@ -88,33 +87,29 @@ public class HoodieLogCompactionPlanGenerator<T extends
HoodieRecordPayload, I,
}
/**
- * Can schedule logcompaction if log files count is greater than 4 or total
log blocks is greater than 4.
+ * Can schedule logcompaction if log files count or total log blocks is
greater than the configured threshold.
* @param fileSlice File Slice under consideration.
+ * @param instantRange Range of valid instants.
* @return Boolean value that determines whether log compaction will be
scheduled or not.
*/
private boolean isFileSliceEligibleForLogCompaction(FileSlice fileSlice,
String maxInstantTime,
Option<InstantRange>
instantRange) {
- LOG.info("Checking if fileId " + fileSlice.getFileId() + " and partition "
- + fileSlice.getPartitionPath() + " eligible for log compaction.");
+ LOG.info("Checking if fileId {} and partition {} eligible for log
compaction.", fileSlice.getFileId(), fileSlice.getPartitionPath());
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
- HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(metaClient.getStorage())
- .withBasePath(hoodieTable.getMetaClient().getBasePath())
- .withLogFilePaths(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(file -> file.getPath().toString())
- .collect(Collectors.toList()))
- .withLatestInstantTime(maxInstantTime)
- .withInstantRange(instantRange)
- .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
- .withOptimizedLogBlocksScan(true)
- .withRecordMerger(writeConfig.getRecordMerger())
- .withTableMetaClient(metaClient)
- .build();
- scanner.scan(true);
+ long numLogFiles = fileSlice.getLogFiles().count();
+ if (numLogFiles >= writeConfig.getLogCompactionBlocksThreshold()) {
+ LOG.info("Total logs files ({}) is greater than log blocks threshold is
{}", numLogFiles, writeConfig.getLogCompactionBlocksThreshold());
+ return true;
+ }
+ HoodieLogBlockMetadataScanner scanner = new
HoodieLogBlockMetadataScanner(metaClient, fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(file -> file.getPath().toString())
+ .collect(Collectors.toList()),
+ writeConfig.getMaxDFSStreamBufferSize(),
+ maxInstantTime,
+ instantRange);
int totalBlocks = scanner.getCurrentInstantLogBlocks().size();
- LOG.info("Total blocks seen are " + totalBlocks + ", log blocks threshold
is "
- + writeConfig.getLogCompactionBlocksThreshold());
+ LOG.info("Total blocks seen are {}, log blocks threshold is {}",
totalBlocks, writeConfig.getLogCompactionBlocksThreshold());
// If total blocks in the file slice is > blocks threshold value(default
value is 5).
// Log compaction can be scheduled.
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 35428d7801a..17e9ef21020 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -194,7 +194,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
@Override
protected HoodieData<HoodieRecord>
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
-
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema,
StorageConfiguration<?> storageConf,
+
HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, Schema
readerSchema, StorageConfiguration<?> storageConf,
String
instantTime) {
throw new HoodieNotSupportedException("Flink metadata table does not
support expression index yet.");
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
index 64b532707e8..0d3da17bdea 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/metadata/JavaHoodieBackedTableMetadataWriter.java
@@ -144,7 +144,7 @@ public class JavaHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetada
@Override
protected HoodieData<HoodieRecord>
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
-
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema,
StorageConfiguration<?> storageConf,
+
HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, Schema
readerSchema, StorageConfiguration<?> storageConf,
String
instantTime) {
throw new HoodieNotSupportedException("Expression index not supported for
Java metadata table writer yet.");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
index 2ca61d2823c..cac3c251b9d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/model/HoodieInternalRow.java
@@ -234,7 +234,7 @@ public class HoodieInternalRow extends InternalRow {
for (int i = 0; i < metaFields.length; i++) {
copyMetaFields[i] = metaFields[i] != null ? metaFields[i].copy() : null;
}
- return new HoodieInternalRow(copyMetaFields, sourceRow.copy(),
sourceContainsMetaFields);
+ return new HoodieInternalRow(copyMetaFields, sourceRow == null ? null :
sourceRow.copy(), sourceContainsMetaFields);
}
private int rebaseOrdinal(int ordinal) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 28ae8622cdf..5a36aacdf56 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -28,25 +28,27 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
-import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
@@ -55,23 +57,22 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.expression.HoodieExpressionIndex;
import org.apache.hudi.index.expression.HoodieSparkExpressionIndex;
import
org.apache.hudi.index.expression.HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata;
-import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
-import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.JavaScalaConverters;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.HoodieCatalystExpressionUtils;
+import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
@@ -94,11 +95,9 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import scala.Function1;
+import scala.collection.immutable.Seq;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
-import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -137,16 +136,16 @@ public class SparkMetadataWriterUtils {
};
}
- public static List<Row> getRowsWithExpressionIndexMetadata(List<Row>
rowsForFilePath, String partition, String filePath, long fileSize) {
- return rowsForFilePath.stream().map(row -> {
- scala.collection.immutable.Seq<Object> indexMetadata =
JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition,
filePath, fileSize));
+ public static ClosableIterator<Row>
getRowsWithExpressionIndexMetadata(ClosableIterator<InternalRow>
rowsForFilePath, SparkRowSerDe sparkRowSerDe, String partition, String
filePath, long fileSize) {
+ return new CloseableMappingIterator<>(rowsForFilePath, row -> {
+ Seq<Object> indexMetadata =
JavaScalaConverters.convertJavaListToScalaList(Arrays.asList(partition,
filePath, fileSize));
Row expressionIndexRow = Row.fromSeq(indexMetadata);
List<Row> rows = new ArrayList<>(2);
- rows.add(row);
+ rows.add(sparkRowSerDe.deserializeRow(row));
rows.add(expressionIndexRow);
- scala.collection.immutable.Seq<Row> rowSeq =
JavaScalaConverters.convertJavaListToScalaList(rows);
+ Seq<Row> rowSeq = JavaScalaConverters.convertJavaListToScalaList(rows);
return Row.merge(rowSeq);
- }).collect(Collectors.toList());
+ });
}
@SuppressWarnings("checkstyle:LineLength")
@@ -241,61 +240,6 @@ public class SparkMetadataWriterUtils {
});
}
- public static List<Row> readRecordsAsRows(StoragePath[] paths, SQLContext
sqlContext,
- HoodieTableMetaClient metaClient,
Schema schema,
- HoodieWriteConfig dataWriteConfig,
boolean isBaseFile) {
- List<HoodieRecord> records = isBaseFile ? getBaseFileRecords(new
HoodieBaseFile(paths[0].toString()), metaClient, schema)
- :
getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()),
metaClient, schema);
- return toRows(records, schema, dataWriteConfig, sqlContext,
paths[0].toString());
- }
-
- private static List<HoodieRecord> getUnmergedLogFileRecords(List<String>
logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
- List<HoodieRecord> records = new ArrayList<>();
- HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(metaClient.getStorage())
- .withBasePath(metaClient.getBasePath())
- .withLogFilePaths(logFilePaths)
- .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
-
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
- .withReaderSchema(readerSchema)
- .withTableMetaClient(metaClient)
- .withLogRecordScannerCallback(records::add)
- .build();
- scanner.scan(false);
- return records;
- }
-
- private static List<HoodieRecord> getBaseFileRecords(HoodieBaseFile
baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) {
- List<HoodieRecord> records = new ArrayList<>();
- HoodieRecordMerger recordMerger =
-
HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(),
EngineType.SPARK, Collections.emptyList(),
- metaClient.getTableConfig().getRecordMergeStrategyId());
- try (HoodieFileReader baseFileReader =
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType())
- .getFileReader(getReaderConfigs(metaClient.getStorageConf()),
baseFile.getStoragePath())) {
- baseFileReader.getRecordIterator(readerSchema).forEachRemaining((record)
-> records.add((HoodieRecord) record));
- return records;
- } catch (IOException e) {
- throw new HoodieIOException("Error reading base file " +
baseFile.getFileName(), e);
- }
- }
-
- private static List<Row> toRows(List<HoodieRecord> records, Schema schema,
HoodieWriteConfig dataWriteConfig, SQLContext sqlContext, String path) {
- StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema);
- Function1<GenericRecord, Row> converterToRow =
AvroConversionUtils.createConverterToRow(schema, structType);
- List<Row> avroRecords = records.stream()
- .map(r -> {
- try {
- return (GenericRecord) (r.getData() instanceof GenericRecord ?
r.getData()
- : ((HoodieRecordPayload) r.getData()).getInsertValue(schema,
dataWriteConfig.getProps()).get());
- } catch (IOException e) {
- throw new HoodieIOException("Could not fetch record payload");
- }
- })
- .map(converterToRow::apply)
- .collect(Collectors.toList());
- return avroRecords;
- }
-
/**
* Generates expression index records
*
@@ -313,7 +257,7 @@ public class SparkMetadataWriterUtils {
*/
public static ExpressionIndexComputationMetadata getExprIndexRecords(
List<Pair<String, Pair<String, Long>>> partitionFilePathAndSizeTriplet,
HoodieIndexDefinition indexDefinition,
- HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema,
String instantTime,
+ HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema,
Schema readerSchema, String instantTime,
HoodieEngineContext engineContext, HoodieWriteConfig dataWriteConfig,
Option<Function<HoodiePairData<String,
HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>>
partitionRecordsFunctionOpt) {
HoodieSparkEngineContext sparkEngineContext = (HoodieSparkEngineContext)
engineContext;
@@ -326,12 +270,12 @@ public class SparkMetadataWriterUtils {
// HUDI-6994 will address this.
ValidationUtils.checkArgument(indexDefinition.getSourceFields().size() ==
1, "Only one source field is supported for expression index");
String columnToIndex = indexDefinition.getSourceFields().get(0);
- SQLContext sqlContext = sparkEngineContext.getSqlContext();
+ ReaderContextFactory<InternalRow> readerContextFactory =
engineContext.getReaderContextFactory(metaClient);
// Read records and append expression index metadata to every row
HoodieData<Row> rowData =
sparkEngineContext.parallelize(partitionFilePathAndSizeTriplet, parallelism)
.flatMap((SerializableFunction<Pair<String, Pair<String, Long>>,
Iterator<Row>>) entry ->
- getExpressionIndexRecordsIterator(metaClient, readerSchema,
dataWriteConfig, entry, sqlContext));
+
getExpressionIndexRecordsIterator(readerContextFactory.getContext(),
metaClient, tableSchema, readerSchema, dataWriteConfig, entry));
// Generate dataset with expression index metadata
StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema)
@@ -356,17 +300,41 @@ public class SparkMetadataWriterUtils {
}
}
- private static Iterator<Row>
getExpressionIndexRecordsIterator(HoodieTableMetaClient metaClient, Schema
readerSchema, HoodieWriteConfig dataWriteConfig,
- Pair<String,
Pair<String, Long>> entry, SQLContext sqlContext) {
+ private static Iterator<Row>
getExpressionIndexRecordsIterator(HoodieReaderContext<InternalRow>
readerContext, HoodieTableMetaClient metaClient,
+ Schema
tableSchema, Schema readerSchema, HoodieWriteConfig dataWriteConfig,
Pair<String, Pair<String, Long>> entry) {
String partition = entry.getKey();
Pair<String, Long> filePathSizePair = entry.getValue();
String filePath = filePathSizePair.getKey();
String relativeFilePath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(filePath));
long fileSize = filePathSizePair.getValue();
- List<Row> rowsForFilePath = readRecordsAsRows(new StoragePath[] {new
StoragePath(filePath)}, sqlContext, metaClient, readerSchema, dataWriteConfig,
- FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1))));
- List<Row> rowsWithIndexMetadata =
getRowsWithExpressionIndexMetadata(rowsForFilePath, partition,
relativeFilePath, fileSize);
- return rowsWithIndexMetadata.iterator();
+ boolean isBaseFile = FSUtils.isBaseFile(new
StoragePath(filePath.substring(filePath.lastIndexOf("/") + 1)));
+ FileSlice fileSlice;
+ if (isBaseFile) {
+ HoodieBaseFile baseFile = new HoodieBaseFile(filePath);
+ fileSlice = new FileSlice(partition, baseFile.getCommitTime(),
baseFile.getFileId());
+ fileSlice.setBaseFile(baseFile);
+ } else {
+ HoodieLogFile logFile = new HoodieLogFile(filePath);
+ fileSlice = new FileSlice(partition, logFile.getDeltaCommitTime(),
logFile.getFileId());
+ fileSlice.addLogFile(logFile);
+ }
+ HoodieFileGroupReader<InternalRow> fileGroupReader =
HoodieFileGroupReader.<InternalRow>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(metaClient)
+ .withDataSchema(tableSchema)
+ .withRequestedSchema(readerSchema)
+ .withProps(dataWriteConfig.getProps())
+
.withLatestCommitTime(metaClient.getActiveTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(""))
+ .withAllowInflightInstants(true)
+ .withFileSlice(fileSlice)
+ .build();
+ try {
+ ClosableIterator<InternalRow> rowsForFilePath =
fileGroupReader.getClosableIterator();
+ SparkRowSerDe sparkRowSerDe =
HoodieCatalystExpressionUtils.sparkAdapter().createSparkRowSerDe(HoodieInternalRowUtils.getCachedSchema(readerSchema));
+ return getRowsWithExpressionIndexMetadata(rowsForFilePath,
sparkRowSerDe, partition, relativeFilePath, fileSize);
+ } catch (IOException ex) {
+ throw new HoodieIOException("Error reading file slice " + fileSlice, ex);
+ }
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 7c555c958c6..f3610e2c746 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -210,14 +211,15 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
commitMetadata.getPartitionToWriteStats().forEach((dataPartition,
writeStats) -> writeStats.forEach(writeStat -> partitionFilePathPairs.add(
Pair.of(writeStat.getPartitionPath(), Pair.of(new
StoragePath(dataMetaClient.getBasePath(), writeStat.getPath()).toString(),
writeStat.getFileSizeInBytes())))));
int parallelism = Math.min(partitionFilePathPairs.size(),
dataWriteConfig.getMetadataConfig().getExpressionIndexParallelism());
- Schema readerSchema =
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient);
+ Schema tableSchema = new
TableSchemaResolver(dataMetaClient).getTableAvroSchema();
+ Schema readerSchema =
getProjectedSchemaForExpressionIndex(indexDefinition, dataMetaClient,
tableSchema);
// Step 2: Compute the expression index column stat and partition stat
records for these newly created files
// partitionRecordsFunctionOpt - Function used to generate partition
stats. These stats are generated only for expression index created using column
stats
//
// In the partitionRecordsFunctionOpt function we merge the expression
index records from the new files created in the commit metadata
// with the expression index records from the unmodified files to get the
new partition stat records
HoodieSparkExpressionIndex.ExpressionIndexComputationMetadata
expressionIndexComputationMetadata =
- SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathPairs,
indexDefinition, dataMetaClient, parallelism, readerSchema, instantTime,
engineContext, dataWriteConfig,
+ SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathPairs,
indexDefinition, dataMetaClient, parallelism, tableSchema, readerSchema,
instantTime, engineContext, dataWriteConfig,
partitionRecordsFunctionOpt);
return
expressionIndexComputationMetadata.getPartitionStatRecordsOption().isPresent()
?
expressionIndexComputationMetadata.getExpressionIndexRecords().union(expressionIndexComputationMetadata.getPartitionStatRecordsOption().get())
@@ -228,10 +230,10 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
protected HoodieData<HoodieRecord>
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet,
HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient, int parallelism,
- Schema
readerSchema, StorageConfiguration<?> storageConf,
+ Schema
tableSchema, Schema readerSchema, StorageConfiguration<?> storageConf,
String
instantTime) {
ExpressionIndexComputationMetadata expressionIndexComputationMetadata =
SparkMetadataWriterUtils.getExprIndexRecords(partitionFilePathAndSizeTriplet,
indexDefinition,
- metaClient, parallelism, readerSchema, instantTime, engineContext,
dataWriteConfig,
+ metaClient, parallelism, tableSchema, readerSchema, instantTime,
engineContext, dataWriteConfig,
Option.of(rangeMetadata ->
HoodieTableMetadataUtil.collectAndProcessExprIndexPartitionStatRecords(rangeMetadata,
true, Option.of(indexDefinition.getIndexName()))));
HoodieData<HoodieRecord> exprIndexRecords =
expressionIndexComputationMetadata.getExpressionIndexRecords();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 7eb9de51441..e49ea638269 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -174,7 +174,8 @@ public class
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
@Override
protected HoodieData<HoodieRecord>
getExpressionIndexRecords(List<Pair<String, Pair<String, Long>>>
partitionFilePathAndSizeTriplet, HoodieIndexDefinition indexDefinition,
-
HoodieTableMetaClient metaClient, int parallelism, Schema readerSchema,
StorageConfiguration<?> storageConf, String instantTime) {
+
HoodieTableMetaClient metaClient, int parallelism, Schema tableSchema, Schema
readerSchema, StorageConfiguration<?> storageConf,
+ String
instantTime) {
throw new HoodieNotSupportedException("Expression index not supported for
Java metadata table writer yet.");
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 002f374abe7..adae4c43388 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -19,6 +19,7 @@
package org.apache.hudi;
+import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -170,6 +171,9 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
@Override
public InternalRow getDeleteRow(InternalRow record, String recordKey) {
- throw new UnsupportedOperationException("Not supported for " +
this.getClass().getSimpleName());
+ if (record != null) {
+ return record;
+ }
+ return new HoodieInternalRow(null, null, UTF8String.fromString(recordKey),
UTF8String.fromString(partitionPath), null, null, false);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index c7117b891fa..a1d750e6823 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -91,7 +91,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
private final String payloadClassFQN;
// Record's key/partition-path fields
private final String recordKeyField;
- private final Option<String> partitionPathFieldOpt;
// Partition name override
private final Option<String> partitionNameOverrideOpt;
// Pre-combining field
@@ -128,8 +127,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
protected final boolean forceFullScan;
// Progress
private float progress = 0.0f;
- // Populate meta fields for the records
- private final boolean populateMetaFields;
// Record type read from log block
// Collect all the block instants after scanning all the log files.
private final List<String> validBlockInstants = new ArrayList<>();
@@ -139,17 +136,15 @@ public abstract class BaseHoodieLogRecordReader<T> {
// Allows to consider inflight instants while merging log records
protected boolean allowInflightInstants;
- protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
HoodieStorage storage, List<String> logFilePaths,
+ protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage,
List<String> logFilePaths,
boolean reverseReader, int bufferSize,
Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan, Option<String> partitionNameOverride,
Option<String> keyFieldOverride, boolean
enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer,
boolean allowInflightInstants) {
this.readerContext = readerContext;
- this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+ this.readerSchema = readerContext.getSchemaHandler() != null ?
readerContext.getSchemaHandler().getRequiredSchema() : null;
this.latestInstantTime = readerContext.getLatestCommitTime();
- this.hoodieTableMetaClient = HoodieTableMetaClient.builder()
- .setStorage(storage)
- .setBasePath(readerContext.getTablePath()).build();
+ this.hoodieTableMetaClient = hoodieTableMetaClient;
// load class from the payload fully qualified class name
HoodieTableConfig tableConfig =
this.hoodieTableMetaClient.getTableConfig();
this.payloadClassFQN = tableConfig.getPayloadClass();
@@ -168,7 +163,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
this.instantRange = instantRange;
this.withOperationField = withOperationField;
this.forceFullScan = forceFullScan;
- this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
+ this.internalSchema = readerContext.getSchemaHandler() != null ?
readerContext.getSchemaHandler().getInternalSchema() : null;
this.enableOptimizedLogBlocksScan = enableOptimizedLogBlocksScan;
if (keyFieldOverride.isPresent()) {
@@ -179,17 +174,11 @@ public abstract class BaseHoodieLogRecordReader<T> {
// are static, like "files", "col_stats", etc)
checkState(partitionNameOverride.isPresent());
- this.populateMetaFields = false;
this.recordKeyField = keyFieldOverride.get();
- this.partitionPathFieldOpt = Option.empty();
} else if (tableConfig.populateMetaFields()) {
- this.populateMetaFields = true;
this.recordKeyField = HoodieRecord.RECORD_KEY_METADATA_FIELD;
- this.partitionPathFieldOpt =
Option.of(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
} else {
- this.populateMetaFields = false;
this.recordKeyField = tableConfig.getRecordKeyFieldProp();
- this.partitionPathFieldOpt =
Option.of(tableConfig.getPartitionFieldProp());
}
this.partitionNameOverrideOpt = partitionNameOverride;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
new file mode 100644
index 00000000000..14d65d1aa6a
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.hudi.avro.HoodieAvroReaderContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.generic.IndexedRecord;
+
+import java.util.List;
+
+/**
+ * Scans a set of log files to extract metadata about the log blocks. It does
not read the actual records.
+ */
+public class HoodieLogBlockMetadataScanner extends
BaseHoodieLogRecordReader<IndexedRecord> {
+
+ public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient,
List<String> logFilePaths, int bufferSize, String maxInstantTime,
Option<InstantRange> instantRange) {
+ super(getReaderContext(metaClient, maxInstantTime), metaClient,
metaClient.getStorage(), logFilePaths, false, bufferSize, instantRange, false,
false, Option.empty(), Option.empty(), true,
+ null, false);
+ scanInternal(Option.empty(), true);
+ }
+
+ private static HoodieReaderContext<IndexedRecord>
getReaderContext(HoodieTableMetaClient metaClient, String maxInstantTime) {
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(metaClient.getStorage().getConf(),
metaClient.getTableConfig(), Option.empty(), Option.empty());
+ readerContext.setHasLogFiles(true);
+ readerContext.setHasBootstrapBaseFile(false);
+ readerContext.setLatestCommitTime(maxInstantTime);
+ return readerContext;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 513c2cce497..f6efc50426d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.util.CollectionUtils;
@@ -62,11 +63,11 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private long totalTimeTakenToReadAndMergeBlocks;
@SuppressWarnings("unchecked")
- private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieStorage storage, List<String> logFilePaths, boolean reverseReader,
+ private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient metaClient, HoodieStorage storage, List<String>
logFilePaths, boolean reverseReader,
int bufferSize, Option<InstantRange>
instantRange, boolean withOperationField, boolean forceFullScan,
Option<String> partitionName,
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
FileGroupRecordBuffer<T> recordBuffer,
boolean allowInflightInstants) {
- super(readerContext, storage, logFilePaths, reverseReader, bufferSize,
instantRange, withOperationField,
+ super(readerContext, metaClient, storage, logFilePaths, reverseReader,
bufferSize, instantRange, withOperationField,
forceFullScan, partitionName, keyFieldOverride,
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
if (forceFullScan) {
@@ -175,6 +176,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private FileGroupRecordBuffer<T> recordBuffer;
private boolean allowInflightInstants = false;
+ private HoodieTableMetaClient metaClient;
@Override
public Builder<T> withHoodieReaderContext(HoodieReaderContext<T>
readerContext) {
@@ -252,6 +254,11 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return this;
}
+ public Builder<T> withMetaClient(HoodieTableMetaClient metaClient) {
+ this.metaClient = metaClient;
+ return this;
+ }
+
@Override
public HoodieMergedLogRecordReader<T> build() {
ValidationUtils.checkArgument(recordBuffer != null, "Record Buffer is
null in Merged Log Record Reader");
@@ -262,7 +269,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
}
return new HoodieMergedLogRecordReader<>(
- readerContext, storage, logFilePaths,
+ readerContext, metaClient, storage, logFilePaths,
reverseReader, bufferSize, instantRange,
withOperationField, forceFullScan,
Option.ofNullable(partitionName),
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index f751f33b100..4c712cf73f1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -77,6 +77,7 @@ import static
org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
*/
public final class HoodieFileGroupReader<T> implements Closeable {
private final HoodieReaderContext<T> readerContext;
+ private final HoodieTableMetaClient metaClient;
private final Option<HoodieBaseFile> hoodieBaseFileOption;
private final List<HoodieLogFile> logFiles;
private final String partitionPath;
@@ -121,6 +122,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
Option<InternalSchema> internalSchemaOpt,
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
long start, long length, boolean
shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete) {
this.readerContext = readerContext;
+ this.metaClient = hoodieTableMetaClient;
this.storage = storage;
this.hoodieBaseFileOption = fileSlice.getBaseFile();
this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
@@ -352,6 +354,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
new StoragePath(path), logFiles.get(0).getPath().getParent()))
.withRecordBuffer(recordBuffer)
.withAllowInflightInstants(allowInflightInstants)
+ .withMetaClient(metaClient)
.build()) {
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 6f8435f7ad3..82f25c76f85 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -19,6 +19,7 @@
package org.apache.hudi.metadata;
import org.apache.hudi.avro.ConvertingGenericData;
+import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.BooleanWrapper;
import org.apache.hudi.avro.model.DateWrapper;
@@ -39,7 +40,9 @@ import org.apache.hudi.avro.model.TimestampMicrosWrapper;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
import org.apache.hudi.common.data.HoodieData;
@@ -47,6 +50,7 @@ import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
@@ -73,7 +77,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
@@ -132,6 +136,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -158,7 +163,9 @@ import static
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMOR
import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
import static
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
+import static
org.apache.hudi.common.config.HoodieReaderConfig.REALTIME_SKIP_MERGE;
import static org.apache.hudi.common.fs.FSUtils.getFileNameFromPath;
import static
org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION;
@@ -243,7 +250,7 @@ public class HoodieTableMetadataUtil {
* the collection of provided records
*/
public static Map<String, HoodieColumnRangeMetadata<Comparable>>
collectColumnRangeMetadata(
- List<HoodieRecord> records,
+ Iterator<HoodieRecord> records,
List<Pair<String, Schema.Field>> targetFields,
String filePath,
Schema recordSchema,
@@ -263,7 +270,7 @@ public class HoodieTableMetadataUtil {
storageConfig.getString(HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
// Collect stats for all columns by iterating through records while
accounting
// corresponding stats
- records.forEach((record) -> {
+ records.forEachRemaining((record) -> {
// For each column (field) we have to index update corresponding column
stats
// with the values from this record
targetFields.forEach(fieldNameFieldPair -> {
@@ -1082,35 +1089,6 @@ public class HoodieTableMetadataUtil {
}, parallelism).values();
}
- @VisibleForTesting
- public static Set<String> getRecordKeys(List<String> logFilePaths,
HoodieTableMetaClient datasetMetaClient,
- Option<Schema> writerSchemaOpt, int
maxBufferSize,
- String latestCommitTimestamp,
boolean includeValidKeys,
- boolean includeDeletedKeys) throws
IOException {
- if (writerSchemaOpt.isPresent()) {
- // read log file records without merging
- Set<String> allRecordKeys = new HashSet<>();
- HoodieUnMergedLogRecordScanner.Builder builder =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(datasetMetaClient.getStorage())
- .withBasePath(datasetMetaClient.getBasePath())
- .withLogFilePaths(logFilePaths)
- .withBufferSize(maxBufferSize)
- .withLatestInstantTime(latestCommitTimestamp)
- .withReaderSchema(writerSchemaOpt.get())
- .withTableMetaClient(datasetMetaClient);
- if (includeValidKeys) {
- builder.withLogRecordScannerCallback(record ->
allRecordKeys.add(record.getRecordKey()));
- }
- if (includeDeletedKeys) {
- builder.withRecordDeletionCallback(deletedKey ->
allRecordKeys.add(deletedKey.getRecordKey()));
- }
- HoodieUnMergedLogRecordScanner scanner = builder.build();
- scanner.scan();
- return allRecordKeys;
- }
- return Collections.emptySet();
- }
-
/**
* Convert rollback action metadata to metadata table records.
* <p>
@@ -1685,7 +1663,7 @@ public class HoodieTableMetadataUtil {
} else if (FSUtils.isLogFile(fileName)) {
Option<Schema> writerSchemaOpt =
tryResolveSchemaForTable(datasetMetaClient);
LOG.warn("Reading log file: {}, to build column range metadata.",
partitionPathFileName);
- return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt, maxBufferSize);
+ return getLogFileColumnRangeMetadata(fullFilePath.toString(),
partitionPath, datasetMetaClient, columnsToIndex, writerSchemaOpt,
maxBufferSize);
}
LOG.warn("Column range index not supported for: {}",
partitionPathFileName);
return Collections.emptyList();
@@ -1701,31 +1679,39 @@ public class HoodieTableMetadataUtil {
* Read column range metadata from log file.
*/
@VisibleForTesting
- public static List<HoodieColumnRangeMetadata<Comparable>>
getLogFileColumnRangeMetadata(String filePath, HoodieTableMetaClient
datasetMetaClient,
+ public static List<HoodieColumnRangeMetadata<Comparable>>
getLogFileColumnRangeMetadata(String filePath, String partitionPath,
+
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex, Option<Schema> writerSchemaOpt,
int maxBufferSize) throws IOException {
if (writerSchemaOpt.isPresent()) {
List<Pair<String, Schema.Field>> fieldsToIndex =
columnsToIndex.stream().map(fieldName ->
HoodieAvroUtils.getSchemaForField(writerSchemaOpt.get(), fieldName))
.collect(Collectors.toList());
- // read log file records without merging
- List<HoodieRecord> records = new ArrayList<>();
- HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(datasetMetaClient.getStorage())
- .withBasePath(datasetMetaClient.getBasePath())
- .withLogFilePaths(Collections.singletonList(filePath))
- .withBufferSize(maxBufferSize)
-
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
- .withReaderSchema(writerSchemaOpt.get())
- .withTableMetaClient(datasetMetaClient)
- .withLogRecordScannerCallback(records::add)
+ // read log files without merging for lower overhead, log files may
contain multiple records for the same key resulting in a wider range of values
than the merged result
+ HoodieLogFile logFile = new HoodieLogFile(filePath);
+ FileSlice fileSlice = new FileSlice(partitionPath,
logFile.getDeltaCommitTime(), logFile.getFileId());
+ fileSlice.addLogFile(logFile);
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(MAX_MEMORY_FOR_MERGE.key(),
Long.toString(maxBufferSize));
+ properties.setProperty(HoodieReaderConfig.MERGE_TYPE.key(),
REALTIME_SKIP_MERGE);
+ // Currently only avro is fully supported for extracting column ranges
(see HUDI-8585)
+ HoodieReaderContext readerContext = new
HoodieAvroReaderContext(datasetMetaClient.getStorageConf(),
datasetMetaClient.getTableConfig(), Option.empty(), Option.empty());
+ HoodieFileGroupReader fileGroupReader =
HoodieFileGroupReader.newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(datasetMetaClient)
+ .withFileSlice(fileSlice)
+ .withDataSchema(writerSchemaOpt.get())
+ .withRequestedSchema(writerSchemaOpt.get())
+
.withLatestCommitTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().requestedTime())
+ .withProps(properties)
.build();
- scanner.scan();
- if (records.isEmpty()) {
- return Collections.emptyList();
+ try (ClosableIterator<HoodieRecord> recordIterator =
(ClosableIterator<HoodieRecord>)
fileGroupReader.getClosableHoodieRecordIterator()) {
+ if (!recordIterator.hasNext()) {
+ return Collections.emptyList();
+ }
+ Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
+ collectColumnRangeMetadata(recordIterator, fieldsToIndex,
getFileNameFromPath(filePath), writerSchemaOpt.get(),
datasetMetaClient.getStorage().getConf());
+ return new ArrayList<>(columnRangeMetadataMap.values());
}
- Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
- collectColumnRangeMetadata(records, fieldsToIndex,
getFileNameFromPath(filePath), writerSchemaOpt.get(),
datasetMetaClient.getStorage().getConf());
- return new ArrayList<>(columnRangeMetadataMap.values());
}
return Collections.emptyList();
}
@@ -2435,8 +2421,7 @@ public class HoodieTableMetadataUtil {
});
}
- public static Schema
getProjectedSchemaForExpressionIndex(HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient) throws Exception {
- Schema tableSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ public static Schema
getProjectedSchemaForExpressionIndex(HoodieIndexDefinition indexDefinition,
HoodieTableMetaClient metaClient, Schema tableSchema) {
List<String> partitionFields =
metaClient.getTableConfig().getPartitionFields()
.map(Arrays::asList)
.orElse(Collections.emptyList());
@@ -2554,12 +2539,11 @@ public class HoodieTableMetadataUtil {
List<Pair<String, FileSlice>> partitionInfoList,
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient,
-
Option<Schema> writerSchemaOpt,
+
Lazy<Option<Schema>> lazyWriterSchemaOpt,
Option<HoodieRecordType> recordTypeOpt) {
if (partitionInfoList.isEmpty()) {
return engineContext.emptyHoodieData();
}
- Lazy<Option<Schema>> lazyWriterSchemaOpt = writerSchemaOpt.isPresent() ?
Lazy.eagerly(writerSchemaOpt) : Lazy.lazily(() ->
tryResolveSchemaForTable(dataTableMetaClient));
final Map<String, Schema> columnsToIndexSchemaMap =
getColumnsToIndex(dataTableMetaClient.getTableConfig(), metadataConfig,
lazyWriterSchemaOpt,
dataTableMetaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().empty(),
recordTypeOpt);
if (columnsToIndexSchemaMap.isEmpty()) {
@@ -2766,7 +2750,7 @@ public class HoodieTableMetadataUtil {
Option<Schema> writerSchemaOpt) {
if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)
writeStat).getColumnStats().isPresent()) {
Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap =
((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
- return columnRangeMap.values().stream().collect(Collectors.toList());
+ return new ArrayList<>(columnRangeMap.values());
}
String filePath = writeStat.getPath();
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index bcf67bfb9e3..3e02dae081f 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -168,7 +168,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
.withPartitionStatsIndexParallelism(1)
.build(),
metaClient,
- Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
+
Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)),
Option.empty());
// Validate the result.
validatePartitionStats(result, instant1, instant2);
@@ -262,6 +262,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
metaClient.getTableConfig().setValue(HoodieTableConfig.PARTITION_FIELDS.key(),
"partition_path");
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataLogFile
= HoodieTableMetadataUtil.getLogFileColumnRangeMetadata(
storagePath2.toString(),
+ p,
metaClient,
columnsToIndex,
Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
@@ -283,7 +284,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
.withPartitionStatsIndexParallelism(1)
.build(),
metaClient,
- Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS),
+
Lazy.eagerly(Option.of(HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS)),
Option.empty());
// Validate the result.
validatePartitionStats(result, instant1, instant2, 6);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
deleted file mode 100644
index e38988ff475..00000000000
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadSnapshotReader.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.hadoop.realtime;
-
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.serialization.DefaultSerializer;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.util.DefaultSizeEstimator;
-import org.apache.hudi.common.util.FileIOUtils;
-import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
-import org.apache.hudi.common.util.HoodieTimer;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.storage.HoodieStorageUtils;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
-import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static
org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE;
-import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE;
-import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
-import static
org.apache.hudi.common.config.HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN;
-import static
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getBaseFileReader;
-import static
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes;
-import static
org.apache.hudi.internal.schema.InternalSchema.getEmptyInternalSchema;
-
-/**
- * An implementation of {@link AbstractRealtimeRecordReader} that reads from
base parquet files and log files,
- * and merges the records on the fly. It differs from {@link
HoodieRealtimeRecordReader} in that it does not
- * implement Hadoop's RecordReader interface, and instead implements Iterator
interface that returns an iterator
- * of {@link HoodieRecord}s which are {@link HoodieAvroIndexedRecord}s. This
can be used by query engines like
- * Trino that do not use Hadoop's RecordReader interface. However, the engine
must support reading from iterators
- * and also support Avro (de)serialization.
- */
-public class HoodieMergeOnReadSnapshotReader extends
AbstractRealtimeRecordReader implements Iterator<HoodieRecord>, AutoCloseable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergeOnReadSnapshotReader.class);
-
- private final String tableBasePath;
- private final List<HoodieLogFile> logFilePaths;
- private final String latestInstantTime;
- private final Schema readerSchema;
- private final JobConf jobConf;
- private final HoodieMergedLogRecordScanner logRecordScanner;
- private final HoodieFileReader baseFileReader;
- private final Map<String, HoodieRecord> logRecordsByKey;
- private final Iterator<HoodieRecord> recordsIterator;
- private final ExternalSpillableMap<String, HoodieRecord> mergedRecordsByKey;
-
- /**
- * In order to instantiate this record reader, one needs to provide
following parameters.
- * An example usage is demonstrated in TestHoodieMergeOnReadSnapshotReader.
- *
- * @param tableBasePath Base path of the Hudi table
- * @param baseFilePath Path of the base file as of the latest instant
time for the split being processed
- * @param logFilePaths Paths of the log files as of the latest file
slices pertaining to file group id of the base file
- * @param latestInstantTime Latest instant time
- * @param readerSchema Schema of the reader
- * @param jobConf Any job configuration
- * @param start Start offset
- * @param length Length of the split
- */
- public HoodieMergeOnReadSnapshotReader(String tableBasePath,
- String baseFilePath,
- List<HoodieLogFile> logFilePaths,
- String latestInstantTime,
- Schema readerSchema,
- JobConf jobConf,
- long start,
- long length) throws IOException {
- super(getRealtimeSplit(tableBasePath, baseFilePath, logFilePaths,
latestInstantTime, start, length, new String[0]), jobConf);
- this.tableBasePath = tableBasePath;
- this.logFilePaths = logFilePaths;
- this.latestInstantTime = latestInstantTime;
- this.readerSchema = readerSchema;
- this.jobConf = jobConf;
- HoodieTimer timer = new HoodieTimer().startTimer();
- this.logRecordScanner = getMergedLogRecordScanner();
- LOG.debug("Time taken to scan log records: {}", timer.endTimer());
- this.baseFileReader = getBaseFileReader(new Path(baseFilePath), jobConf);
- this.logRecordsByKey = logRecordScanner.getRecords();
- Set<String> logRecordKeys = new HashSet<>(this.logRecordsByKey.keySet());
- this.mergedRecordsByKey = new ExternalSpillableMap<>(
- getMaxCompactionMemoryInBytes(jobConf),
- jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
- FileIOUtils.getDefaultSpillableMapBasePath()),
- new DefaultSizeEstimator(),
- new HoodieRecordSizeEstimator(readerSchema),
- jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(),
SPILLABLE_DISK_MAP_TYPE.defaultValue()),
- new DefaultSerializer<>(),
- jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()),
- getClass().getSimpleName());
- try (ClosableIterator<String> baseFileIterator =
baseFileReader.getRecordKeyIterator()) {
- timer.startTimer();
- while (baseFileIterator.hasNext()) {
- String key = baseFileIterator.next();
- if (logRecordKeys.contains(key)) {
- logRecordKeys.remove(key);
- Option<HoodieAvroIndexedRecord> mergedRecord =
buildGenericRecordWithCustomPayload(logRecordsByKey.get(key));
- if (mergedRecord.isPresent()) {
- HoodieRecord hoodieRecord = mergedRecord.get().copy();
- mergedRecordsByKey.put(key, hoodieRecord);
- }
- }
- }
- }
- LOG.debug("Time taken to merge base file and log file records: {}",
timer.endTimer());
- this.recordsIterator = mergedRecordsByKey.values().iterator();
- }
-
- @Override
- public boolean hasNext() {
- return recordsIterator.hasNext();
- }
-
- @Override
- public HoodieRecord next() {
- return recordsIterator.next();
- }
-
- public Map<String, HoodieRecord> getRecordsByKey() {
- return mergedRecordsByKey;
- }
-
- public Iterator<HoodieRecord> getRecordsIterator() {
- return recordsIterator;
- }
-
- public Map<String, HoodieRecord> getLogRecordsByKey() {
- return logRecordsByKey;
- }
-
- private static HoodieRealtimeFileSplit getRealtimeSplit(String
tableBasePath, String baseFilePath,
- List<HoodieLogFile>
logFilePaths,
- String
latestInstantTime,
- long start, long
length, String[] hosts) {
- HoodieRealtimePath realtimePath = new HoodieRealtimePath(
- new Path(baseFilePath).getParent(),
- baseFilePath,
- tableBasePath,
- logFilePaths,
- latestInstantTime,
- false, // TODO: Fix this to support incremental queries
- Option.empty());
- return HoodieInputFormatUtils.createRealtimeFileSplit(realtimePath, start,
length, hosts);
- }
-
- private HoodieMergedLogRecordScanner getMergedLogRecordScanner() {
- return HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(HoodieStorageUtils.getStorage(
- split.getPath().toString(), HadoopFSUtils.getStorageConf(jobConf)))
- .withBasePath(tableBasePath)
- .withLogFilePaths(logFilePaths.stream().map(logFile ->
logFile.getPath().toString()).collect(Collectors.toList()))
- .withReaderSchema(readerSchema)
- .withLatestInstantTime(latestInstantTime)
- .withMaxMemorySizeInBytes(getMaxCompactionMemoryInBytes(jobConf))
- .withReverseReader(false)
- .withBufferSize(jobConf.getInt(MAX_DFS_STREAM_BUFFER_SIZE.key(),
- DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
- .withSpillableMapBasePath(jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
- FileIOUtils.getDefaultSpillableMapBasePath()))
- .withDiskMapType(jobConf.getEnum(SPILLABLE_DISK_MAP_TYPE.key(),
SPILLABLE_DISK_MAP_TYPE.defaultValue()))
-
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
-
.withOptimizedLogBlocksScan(jobConf.getBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(),
-
Boolean.parseBoolean(ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue())))
-
.withInternalSchema(schemaEvolutionContext.internalSchemaOption.orElse(getEmptyInternalSchema()))
- .build();
- }
-
- private Option<HoodieAvroIndexedRecord>
buildGenericRecordWithCustomPayload(HoodieRecord record) throws IOException {
- if (usesCustomPayload) {
- return record.toIndexedRecord(getWriterSchema(), payloadProps);
- } else {
- return record.toIndexedRecord(readerSchema, payloadProps);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (baseFileReader != null) {
- baseFileReader.close();
- }
- if (logRecordScanner != null) {
- logRecordScanner.close();
- }
- if (mergedRecordsByKey != null) {
- mergedRecordsByKey.close();
- }
- }
-}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
deleted file mode 100644
index 555b7cd2731..00000000000
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.hadoop.realtime;
-
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
-import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileGroupId;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.testutils.SchemaTestUtil;
-import org.apache.hudi.common.util.CommitUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
-import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
-import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
-import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath;
-import static
org.apache.hudi.hadoop.testutils.InputFormatTestUtil.writeDataBlockToLogFile;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class TestHoodieMergeOnReadSnapshotReader {
-
- private static final int TOTAL_RECORDS = 100;
- private static final String FILE_ID = "fileid0";
- private static final String COLUMNS =
-
"_hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,field1,field2,name,favorite_number,favorite_color,favorite_movie";
- private static final String COLUMN_TYPES =
"string,string,string,string,string,string,string,string,int,string,string";
- private JobConf baseJobConf;
- private HoodieStorage storage;
- private Configuration hadoopConf;
-
- @TempDir
- public java.nio.file.Path basePath;
-
- @BeforeEach
- public void setUp() {
- hadoopConf = HoodieTestUtils.getDefaultStorageConf().unwrap();
- hadoopConf.set("fs.defaultFS", "file:///");
- hadoopConf.set("fs.file.impl",
org.apache.hadoop.fs.LocalFileSystem.class.getName());
- baseJobConf = new JobConf(hadoopConf);
- baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
String.valueOf(1024 * 1024));
- baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS);
- baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES);
- baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
- storage = new HoodieHadoopStorage(HadoopFSUtils.getFs(new
StoragePath(basePath.toUri()), baseJobConf));
- }
-
- @AfterEach
- public void tearDown() throws Exception {
- if (storage != null) {
- storage.deleteDirectory(new StoragePath(basePath.toUri()));
- storage.close();
- }
- }
-
- @Test
- public void testSnapshotReader() throws Exception {
- testReaderInternal(false,
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
- }
-
- @Test
- public void testSnapshotReaderPartitioned() throws Exception {
- testReaderInternal(true,
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
- }
-
- private void testReaderInternal(boolean partitioned,
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
- // initial commit
- Schema schema =
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
- HoodieTestUtils.init(HadoopFSUtils.getStorageConf(hadoopConf),
basePath.toString(), HoodieTableType.MERGE_ON_READ);
- String baseInstant = "100";
- File partitionDir = partitioned ?
InputFormatTestUtil.prepareParquetTable(basePath, schema, 1, TOTAL_RECORDS,
baseInstant,
- HoodieTableType.MERGE_ON_READ)
- : InputFormatTestUtil.prepareNonPartitionedParquetTable(basePath,
schema, 1, TOTAL_RECORDS, baseInstant,
- HoodieTableType.MERGE_ON_READ);
-
- HoodieCommitMetadata commitMetadata =
CommitUtils.buildMetadata(Collections.emptyList(), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT,
- schema.toString(), HoodieTimeline.DELTA_COMMIT_ACTION);
- FileCreateUtilsLegacy.createDeltaCommit(COMMIT_METADATA_SER_DE,
basePath.toString(), baseInstant, commitMetadata);
- // Add the paths
- FileInputFormat.setInputPaths(baseJobConf, partitionDir.getPath());
-
- List<Pair<String, Integer>> logVersionsWithAction = new ArrayList<>();
- logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 1));
- logVersionsWithAction.add(Pair.of(HoodieTimeline.DELTA_COMMIT_ACTION, 2));
- String baseFilePath = partitionDir + "/" + FILE_ID + "_1-0-1_" +
baseInstant + ".parquet";
- String partitionPath = partitioned ? getRelativePartitionPath(new
Path(basePath.toString()), new Path(partitionDir.getAbsolutePath())) :
"default";
- FileSlice fileSlice = new FileSlice(
- new HoodieFileGroupId(partitionPath, FILE_ID),
- baseInstant,
- new HoodieBaseFile(storage.getPathInfo(new StoragePath(baseFilePath))),
- new ArrayList<>());
- logVersionsWithAction.forEach(logVersionWithAction -> {
- try {
- // update files or generate new log file
- int logVersion = logVersionWithAction.getRight();
- String action = logVersionWithAction.getKey();
- int baseInstantTs = Integer.parseInt(baseInstant);
- String instantTime = String.valueOf(baseInstantTs + logVersion);
- String latestInstant =
- action.equals(HoodieTimeline.ROLLBACK_ACTION) ?
String.valueOf(baseInstantTs + logVersion - 2)
- : instantTime;
-
- HoodieLogFormat.Writer writer = writeDataBlockToLogFile(
- partitionDir,
- storage,
- schema,
- FILE_ID,
- baseInstant,
- latestInstant,
- 120,
- 0,
- logVersion,
- logBlockType);
- long size = writer.getCurrentSize();
- writer.close();
- assertTrue(size > 0, "block - size should be > 0");
- FileCreateUtilsLegacy.createDeltaCommit(COMMIT_METADATA_SER_DE,
basePath.toString(), instantTime, commitMetadata);
- fileSlice.addLogFile(writer.getLogFile());
-
- HoodieMergeOnReadSnapshotReader snapshotReader = new
HoodieMergeOnReadSnapshotReader(
- basePath.toString(),
- fileSlice.getBaseFile().get().getPath(),
- fileSlice.getLogFiles().collect(Collectors.toList()),
- latestInstant,
- schema,
- baseJobConf,
- 0,
- size);
- Map<String, HoodieRecord> records = snapshotReader.getRecordsByKey();
- assertEquals(TOTAL_RECORDS, records.size());
- snapshotReader.close();
- } catch (Exception ioe) {
- throw new HoodieException(ioe.getMessage(), ioe);
- }
- });
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java
deleted file mode 100644
index 2b13c068112..00000000000
---
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi;
-
-import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.testutils.HoodieClientTestBase;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
-import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-class TestDataSourceUtils extends HoodieClientTestBase {
-
- @Test
- void testDeduplicationAgainstRecordsAlreadyInTable() {
- HoodieWriteConfig config = getConfig();
- config.getProps().setProperty("path", config.getBasePath());
- try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
- String newCommitTime = writeClient.startCommit();
- List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
- JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 2);
- List<WriteStatus> statusList = writeClient.bulkInsert(recordsRDD,
newCommitTime).collect();
- writeClient.commit(newCommitTime, jsc.parallelize(statusList),
Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty());
- assertNoWriteErrors(statusList);
-
- Map<String, String> parameters =
config.getProps().entrySet().stream().collect(Collectors.toMap(entry ->
entry.getKey().toString(), entry -> entry.getValue().toString()));
- List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime,
10);
- List<HoodieRecord> inputRecords = Stream.concat(records.subList(0,
10).stream(), newRecords.stream()).collect(Collectors.toList());
- List<HoodieRecord> output = DataSourceUtils.resolveDuplicates(jsc,
jsc.parallelize(inputRecords, 1), parameters, false).collect();
- Set<String> expectedRecordKeys =
newRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
- assertEquals(expectedRecordKeys,
output.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet()));
- }
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
index 519a8811eb5..0ed3d0ed20e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/testutils/LogFileColStatsTestUtil.java
@@ -18,28 +18,22 @@
package org.apache.hudi.testutils;
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
/**
* Util methods used in tests to fetch col stats records for a log file.
@@ -47,29 +41,12 @@ import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRang
public class LogFileColStatsTestUtil {
public static Option<Row> getLogFileColumnRangeMetadata(String filePath,
HoodieTableMetaClient datasetMetaClient, String latestCommitTime,
- List<String> columnsToIndex,
Option<Schema> writerSchemaOpt,
- int maxBufferSize) throws
IOException {
+ List<String>
columnsToIndex, Option<Schema> writerSchemaOpt,
+ int maxBufferSize)
throws IOException {
if (writerSchemaOpt.isPresent()) {
- List<Pair<String, Schema.Field>> fieldsToIndex = columnsToIndex.stream()
- .map(fieldName ->
HoodieAvroUtils.getSchemaForField(writerSchemaOpt.get(), fieldName, ""))
- .collect(Collectors.toList());
- List<HoodieRecord> records = new ArrayList<>();
- HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(datasetMetaClient.getStorage())
- .withBasePath(datasetMetaClient.getBasePath())
- .withLogFilePaths(Collections.singletonList(filePath))
- .withBufferSize(maxBufferSize)
- .withLatestInstantTime(latestCommitTime)
- .withReaderSchema(writerSchemaOpt.get())
- .withLogRecordScannerCallback(records::add)
- .build();
- scanner.scan();
- if (records.isEmpty()) {
- return Option.empty();
- }
- Map<String, HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataMap =
- collectColumnRangeMetadata(records, fieldsToIndex, filePath,
writerSchemaOpt.get(), datasetMetaClient.getStorageConf());
- List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
new ArrayList<>(columnRangeMetadataMap.values());
+ String partitionPath =
FSUtils.getRelativePartitionPath(datasetMetaClient.getBasePath(), new
StoragePath(filePath).getParent());
+ List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList =
+ HoodieTableMetadataUtil.getLogFileColumnRangeMetadata(filePath,
partitionPath, datasetMetaClient, columnsToIndex, writerSchemaOpt,
maxBufferSize);
return Option.of(getColStatsEntry(filePath, columnRangeMetadataList));
} else {
throw new HoodieException("Writer schema needs to be set");
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index 3001cde81a6..8d7c9403d44 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
@@ -35,6 +36,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
@@ -60,7 +62,14 @@ import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -75,7 +84,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-public class TestDataSourceUtils {
+public class TestDataSourceUtils extends HoodieClientTestBase {
@Mock
private SparkRDDWriteClient hoodieWriteClient;
@@ -300,4 +309,26 @@ public class TestDataSourceUtils {
assertEquals(genericRecordHoodieMetadataPayload,
deserGenericRecordHoodieMetadataPayload);
}
+
+ @Test
+ void testDeduplicationAgainstRecordsAlreadyInTable() throws IOException {
+ initResources();
+ HoodieWriteConfig config = getConfig();
+ config.getProps().setProperty("path", config.getBasePath());
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
+ String newCommitTime = writeClient.startCommit();
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+ JavaRDD<HoodieRecord> recordsRDD = jsc.parallelize(records, 2);
+ List<WriteStatus> statusList = writeClient.bulkInsert(recordsRDD,
newCommitTime).collect();
+ writeClient.commit(newCommitTime, jsc.parallelize(statusList),
Option.empty(), COMMIT_ACTION, Collections.emptyMap(), Option.empty());
+ assertNoWriteErrors(statusList);
+
+ Map<String, String> parameters =
config.getProps().entrySet().stream().collect(Collectors.toMap(entry ->
entry.getKey().toString(), entry -> entry.getValue().toString()));
+ List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime,
10);
+ List<HoodieRecord> inputRecords = Stream.concat(records.subList(0,
10).stream(), newRecords.stream()).collect(Collectors.toList());
+ List<HoodieRecord> output = DataSourceUtils.resolveDuplicates(jsc,
jsc.parallelize(inputRecords, 1), parameters, false).collect();
+ Set<String> expectedRecordKeys =
newRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet());
+ assertEquals(expectedRecordKeys,
output.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toSet()));
+ }
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index b6c9af65b95..1b57a93eb6c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -26,7 +26,7 @@ import
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
-import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.table.log.HoodieLogBlockMetadataScanner;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -61,9 +61,9 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY;
import static
org.apache.hudi.testutils.GenericRecordValidationTestUtils.assertDataInMORTable;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -485,33 +485,25 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
List<String> partitionPaths =
Stream.of(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).collect(Collectors.toList());
for (String partitionPath: partitionPaths) {
fileSystemView.getLatestFileSlices(partitionPath).forEach(slice -> {
- HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(metaClient.getStorage())
- .withBasePath(table.getMetaClient().getBasePath())
- .withLogFilePaths(slice.getLogFiles()
+ HoodieLogBlockMetadataScanner scanner = new
HoodieLogBlockMetadataScanner(
+ table.getMetaClient(),
+ slice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(file -> file.getPath().toString())
- .collect(Collectors.toList()))
- .withLatestInstantTime(instant)
- .withBufferSize(config.getMaxDFSStreamBufferSize())
- .withOptimizedLogBlocksScan(true)
- .withTableMetaClient(metaClient)
- .build();
- scanner.scan(true);
+ .collect(Collectors.toList()),
+ config.getMaxDFSStreamBufferSize(),
+ instant,
+ Option.empty());
List<String> prevInstants = scanner.getValidBlockInstants();
- HoodieUnMergedLogRecordScanner scanner2 =
HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(metaClient.getStorage())
- .withBasePath(table.getMetaClient().getBasePath())
- .withLogFilePaths(slice.getLogFiles()
+ HoodieLogBlockMetadataScanner scanner2 = new
HoodieLogBlockMetadataScanner(
+ table.getMetaClient(),
+ slice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(file -> file.getPath().toString())
- .collect(Collectors.toList()))
- .withLatestInstantTime(currentInstant)
- .withBufferSize(config.getMaxDFSStreamBufferSize())
- .withOptimizedLogBlocksScan(true)
- .withTableMetaClient(table.getMetaClient())
- .build();
- scanner2.scan(true);
+ .collect(Collectors.toList()),
+ config.getMaxDFSStreamBufferSize(),
+ currentInstant,
+ Option.empty());
List<String> currentInstants = scanner2.getValidBlockInstants();
assertEquals(prevInstants, currentInstants);
});
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
index b5c2f749e43..b9ecbd6ec5f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java
@@ -24,23 +24,28 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -55,6 +60,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -73,7 +79,6 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords;
-import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeys;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getRevivedAndDeletedKeysFromMergedLogs;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.reduceByKeys;
import static
org.apache.hudi.metadata.SecondaryIndexKeyUtils.constructSecondaryIndexKey;
@@ -524,14 +529,15 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
writeStatuses3.stream().filter(writeStatus ->
FSUtils.isLogFile(FSUtils.getFileName(writeStatus.getStat().getPath(),
writeStatus.getPartitionPath())))
.forEach(writeStatus -> {
try {
- StoragePath fullFilePath = new StoragePath(basePath,
writeStatus.getStat().getPath());
+ HoodieWriteStat writeStat = writeStatus.getStat();
+ StoragePath fullFilePath = new StoragePath(basePath,
writeStat.getPath());
// used for RLI
finalActualDeletes.addAll(getRevivedAndDeletedKeysFromMergedLogs(metaClient,
latestCommitTimestamp, EngineType.SPARK,
Collections.singletonList(fullFilePath.toString()), writerSchemaOpt,
Collections.singletonList(fullFilePath.toString())).getValue());
// used in SI flow
-
actualUpdatesAndDeletes.addAll(getRecordKeys(Collections.singletonList(fullFilePath.toString()),
metaClient, writerSchemaOpt,
- writeConfig.getMetadataConfig().getMaxReaderBufferSize(),
latestCommitTimestamp, true, true));
+
actualUpdatesAndDeletes.addAll(getRecordKeys(writeStat.getPartitionPath(),
writeStat.getPrevCommit(), writeStat.getFileId(),
+ Collections.singletonList(fullFilePath), metaClient,
writerSchemaOpt, latestCommitTimestamp));
} catch (IOException e) {
throw new HoodieIOException("Failed w/ IOException ", e);
}
@@ -701,4 +707,36 @@ public class TestMetadataUtilRLIandSIRecordGeneration
extends HoodieClientTestBa
}
});
}
+
+ Set<String> getRecordKeys(String partition, String baseInstantTime, String
fileId, List<StoragePath> logFilePaths, HoodieTableMetaClient datasetMetaClient,
+ Option<Schema> writerSchemaOpt, String
latestCommitTimestamp) throws IOException {
+ if (writerSchemaOpt.isPresent()) {
+ // read log file records without merging
+ FileSlice fileSlice = new FileSlice(partition, baseInstantTime, fileId);
+ logFilePaths.forEach(logFilePath -> {
+ HoodieLogFile logFile = new HoodieLogFile(logFilePath);
+ fileSlice.addLogFile(logFile);
+ });
+ TypedProperties properties = new TypedProperties();
+ // configure un-merged log file reader
+ HoodieReaderContext<InternalRow> readerContext =
context.getReaderContextFactory(metaClient).getContext();
+ HoodieFileGroupReader<InternalRow> reader =
HoodieFileGroupReader.<InternalRow>newBuilder()
+ .withReaderContext(readerContext)
+ .withDataSchema(writerSchemaOpt.get())
+ .withRequestedSchema(writerSchemaOpt.get())
+ .withEmitDelete(true)
+ .withFileSlice(fileSlice)
+ .withLatestCommitTime(latestCommitTimestamp)
+ .withHoodieTableMetaClient(datasetMetaClient)
+ .withProps(properties)
+ .withEmitDelete(true)
+ .build();
+ Set<String> allRecordKeys = new HashSet<>();
+ try (ClosableIterator<String> keysIterator =
reader.getClosableKeyIterator()) {
+ keysIterator.forEachRemaining(allRecordKeys::add);
+ }
+ return allRecordKeys;
+ }
+ return Collections.emptySet();
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 17ac8a66070..1308687079a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1844,7 +1844,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
StoragePath storagePartitionPath = new
StoragePath(metaClient.getBasePath(), partitionPath);
String filePath = new StoragePath(storagePartitionPath,
filename).toString();
try {
- return getLogFileColumnRangeMetadata(filePath, metaClient,
allColumnNameList, Option.of(readerSchema),
+ return getLogFileColumnRangeMetadata(filePath, partitionPath,
metaClient, allColumnNameList, Option.of(readerSchema),
metadataConfig.getMaxReaderBufferSize())
.stream()
// We need to convert file path and use only the file name
instead of the complete file path