This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ac54271c3527 fix: Fix file pruning based on column stats for flink
reader (#14186)
ac54271c3527 is described below
commit ac54271c3527d693695176fedb9df6a7b1ba037a
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat Nov 1 10:44:04 2025 +0800
fix: Fix file pruning based on column stats for flink reader (#14186)
---
.../org/apache/hudi/common/model/FileSlice.java | 11 +++
.../apache/hudi/common/model/TestFileSlice.java | 79 +++++++++++++++++++
.../java/org/apache/hudi/source/FileIndex.java | 60 ++++++++-------
.../apache/hudi/source/IncrementalInputSplits.java | 88 ++++++++++++----------
.../org/apache/hudi/table/HoodieTableSource.java | 68 +++++++++--------
.../apache/hudi/table/catalog/HoodieCatalog.java | 13 +++-
.../java/org/apache/hudi/source/TestFileIndex.java | 52 ++++++++-----
.../apache/hudi/table/ITTestHoodieDataSource.java | 48 ++++++++++++
.../apache/hudi/table/TestHoodieTableSource.java | 60 +++++++--------
.../hudi/table/catalog/TestHoodieCatalog.java | 25 ++++++
10 files changed, 356 insertions(+), 148 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
index 21e9a93d4afc..5f4c87ddb59a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
@@ -22,6 +22,7 @@ import
org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.util.Option;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
@@ -140,6 +141,16 @@ public class FileSlice implements Serializable {
return Option.fromJavaOptional(logFiles.stream().findFirst());
}
+ /**
+ * Return file names list of base file and log files.
+ */
+ public List<String> getAllFileNames() {
+ List<String> fileList = new ArrayList<>();
+ getBaseFile().ifPresent(hoodieBaseFile ->
fileList.add(hoodieBaseFile.getFileName()));
+ getLogFiles().forEach(hoodieLogFile ->
fileList.add(hoodieLogFile.getFileName()));
+ return fileList;
+ }
+
public long getTotalFileSize() {
return getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L)
+ getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
index 8538d645c4fc..a09bfaebc9a2 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
@@ -22,13 +22,23 @@ import org.apache.hudi.storage.StoragePath;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests {@link FileSlice}.
*/
public class TestFileSlice {
+ private static final String PARTITION_PATH = "test_partition";
+ private static final String FILE_ID = "test_file_id";
+ private static final String BASE_INSTANT = "001";
+
@Test
void testGetLatestInstantTime() {
String baseInstant = "003";
@@ -48,4 +58,73 @@ public class TestFileSlice {
private static String getLogFileName(String instantTime) {
return ".fg1_" + instantTime + ".log.1_1-0-1";
}
+
+ @Test
+ public void testGetAllFilesWithBaseFileOnly() {
+ // Create a FileSlice with only a base file and no log files
+ HoodieBaseFile baseFile = new HoodieBaseFile(
+ "file://" + PARTITION_PATH + "/test_base_file.parquet");
+ FileSlice fileSlice = new FileSlice(
+ new HoodieFileGroupId(PARTITION_PATH, FILE_ID),
+ BASE_INSTANT,
+ baseFile,
+ Collections.emptyList()
+ );
+
+ List<String> allFiles = fileSlice.getAllFileNames();
+ assertEquals(1, allFiles.size());
+ assertTrue(allFiles.contains(baseFile.getFileName()));
+ }
+
+ @Test
+ public void testGetAllFilesWithLogFilesOnly() {
+ // Create a FileSlice with no base file but with log files
+ // Log files must follow the proper naming convention:
.{fileId}_{instant}.log.{version}
+ HoodieLogFile logFile1 = new HoodieLogFile(new StoragePath(PARTITION_PATH
+ "/." + FILE_ID + "_002.log.1"));
+ HoodieLogFile logFile2 = new HoodieLogFile(new StoragePath(PARTITION_PATH
+ "/." + FILE_ID + "_003.log.2"));
+
+ FileSlice fileSlice = new FileSlice(
+ new HoodieFileGroupId(PARTITION_PATH, FILE_ID),
+ BASE_INSTANT,
+ null, // No base file
+ Arrays.asList(logFile1, logFile2)
+ );
+
+ List<String> allFiles = fileSlice.getAllFileNames();
+ assertEquals(2, allFiles.size());
+ assertTrue(allFiles.contains(logFile1.getFileName()));
+ assertTrue(allFiles.contains(logFile2.getFileName()));
+ }
+
+ @Test
+ public void testGetAllFilesWithBaseFileAndLogFiles() {
+ // Create a FileSlice with both base file and log files
+ HoodieBaseFile baseFile = new HoodieBaseFile(
+ "file://" + PARTITION_PATH + "/test_base_file.parquet");
+ // Log files must follow the proper naming convention:
.{fileId}_{instant}.log.{version}
+ HoodieLogFile logFile1 = new HoodieLogFile(new StoragePath(PARTITION_PATH
+ "/." + FILE_ID + "_004.log.1"));
+ HoodieLogFile logFile2 = new HoodieLogFile(new StoragePath(PARTITION_PATH
+ "/." + FILE_ID + "_005.log.2"));
+
+ FileSlice fileSlice = new FileSlice(
+ new HoodieFileGroupId(PARTITION_PATH, FILE_ID),
+ BASE_INSTANT,
+ baseFile,
+ Arrays.asList(logFile1, logFile2)
+ );
+
+ List<String> allFiles = fileSlice.getAllFileNames();
+ assertEquals(3, allFiles.size());
+ assertTrue(allFiles.contains(baseFile.getFileName()));
+ assertTrue(allFiles.contains(logFile1.getFileName()));
+ assertTrue(allFiles.contains(logFile2.getFileName()));
+ }
+
+ @Test
+ public void testGetAllFilesEmptyFileSlice() {
+ // Test with an empty file slice (no files and no base file)
+ FileSlice fileSlice = new FileSlice(PARTITION_PATH, BASE_INSTANT, FILE_ID);
+
+ List<String> allFiles = fileSlice.getAllFileNames();
+ assertEquals(0, allFiles.size());
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 4d6d89ebf1d6..adae1d8df614 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -21,6 +21,7 @@ package org.apache.hudi.source;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -148,7 +149,7 @@ public class FileIndex implements Serializable {
}
/**
- * Returns all the file statuses under the table base path.
+ * Return all files in the filtered partitions.
*/
public List<StoragePathInfo> getFilesInPartitions() {
if (!tableExists) {
@@ -161,41 +162,42 @@ public class FileIndex implements Serializable {
}
Map<String, List<StoragePathInfo>> filesInPartitions =
FSUtils.getFilesInPartitions(
new HoodieFlinkEngineContext(hadoopConf), metaClient, metadataConfig,
partitions);
- int totalFilesNum =
filesInPartitions.values().stream().mapToInt(List::size).sum();
- if (totalFilesNum < 1) {
- // returns early for empty table.
- return Collections.emptyList();
- }
+ return
filesInPartitions.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ }
- List<StoragePathInfo> allFiles;
+ /**
+ * Filter file slices by pruning based on bucket id and column stats.
+ */
+ public List<FileSlice> filterFileSlices(List<FileSlice> fileSlices) {
+ List<FileSlice> filteredFileSlices;
// bucket pruning
if (this.partitionBucketIdFunc != null) {
- allFiles = filesInPartitions.entrySet().stream().flatMap(entry -> {
- String bucketIdStr =
BucketIdentifier.bucketIdStr(partitionBucketIdFunc.apply(entry.getKey()));
- return entry.getValue().stream().filter(fileInfo ->
fileInfo.getPath().getName().contains(bucketIdStr));
+ filteredFileSlices = fileSlices.stream().filter(fileSlice -> {
+ String bucketIdStr =
BucketIdentifier.bucketIdStr(partitionBucketIdFunc.apply(fileSlice.getPartitionPath()));
+ return fileSlice.getFileGroupId().getFileId().contains(bucketIdStr);
}).collect(Collectors.toList());
- logPruningMsg(totalFilesNum, allFiles.size(), "bucket pruning");
+ logPruningMsg(fileSlices.size(), filteredFileSlices.size(), "bucket
pruning");
} else {
- allFiles =
filesInPartitions.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ filteredFileSlices = fileSlices;
}
-
- if (allFiles.isEmpty()) {
- // returns early for empty table.
- return allFiles;
+ if (filteredFileSlices.isEmpty()) {
+ return Collections.emptyList();
}
- // data skipping
- Set<String> candidateFiles = fileStatsIndex.computeCandidateFiles(
- colStatsProbe, allFiles.stream().map(f ->
f.getPath().getName()).collect(Collectors.toList()));
+ // data skipping based on column stats
+ List<String> allFiles =
fileSlices.stream().map(FileSlice::getAllFileNames).flatMap(List::stream).collect(Collectors.toList());
+ Set<String> candidateFiles =
fileStatsIndex.computeCandidateFiles(colStatsProbe, allFiles);
if (candidateFiles == null) {
// no need to filter by col stats or error occurs.
- return allFiles;
+ return filteredFileSlices;
}
- List<StoragePathInfo> results = allFiles.stream().parallel()
- .filter(fileStatus ->
candidateFiles.contains(fileStatus.getPath().getName()))
- .collect(Collectors.toList());
- logPruningMsg(allFiles.size(), results.size(), "data skipping");
- return results;
+ List<FileSlice> result = filteredFileSlices.stream().filter(fileSlice -> {
+ // if any file in the file slice is part of candidate file names, we
need to include the file slice.
+ // in other words, if all files in the file slice are not present in
candidate file names, we can filter out the file slice.
+ return
fileSlice.getAllFileNames().stream().anyMatch(candidateFiles::contains);
+ }).collect(Collectors.toList());
+ logPruningMsg(filteredFileSlices.size(), result.size(), "column stats
pruning");
+ return result;
}
/**
@@ -261,15 +263,15 @@ public class FileIndex implements Serializable {
return false;
}
- private void logPruningMsg(int numTotalFiles, int numLeftFiles, String
action) {
+ private void logPruningMsg(int numTotalFileSlices, int numLeftFileSlices,
String action) {
LOG.info("\n"
+ "------------------------------------------------------------\n"
+ "---------- action: {}\n"
- + "---------- total files: {}\n"
- + "---------- left files: {}\n"
+ + "---------- total file slices: {}\n"
+ + "---------- left file slices: {}\n"
+ "---------- skipping rate: {}\n"
+ "------------------------------------------------------------",
- action, numTotalFiles, numLeftFiles, percentage(numTotalFiles,
numLeftFiles));
+ action, numTotalFileSlices, numLeftFileSlices,
percentage(numTotalFileSlices, numLeftFileSlices));
}
private static double percentage(double total, double left) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 4655ce760608..c7dd6dcdf9b2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -54,7 +54,6 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -175,15 +174,18 @@ public class IncrementalInputSplits implements
Serializable {
// 4. the end commit is archived
Set<String> readPartitions;
final List<StoragePathInfo> fileInfoList;
+ final List<FileSlice> fileSlices;
if (fullTableScan) {
// scans the partitions and files directly.
FileIndex fileIndex = getFileIndex(metaClient);
readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths());
- if (readPartitions.size() == 0) {
+ if (readPartitions.isEmpty()) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileInfoList = fileIndex.getFilesInPartitions();
+ List<FileSlice> allFileSlices = getFileSlices(metaClient,
commitTimeline, readPartitions, fileInfoList,
analyzingResult.getMaxCompletionTime(), false);
+ fileSlices = fileIndex.filterFileSlices(allFileSlices);
} else {
if (cdcEnabled) {
// case1: cdc change log enabled
@@ -197,7 +199,7 @@ public class IncrementalInputSplits implements Serializable
{
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, commitTimeline))
.collect(Collectors.toList());
readPartitions = getReadPartitions(metadataList);
- if (readPartitions.size() == 0) {
+ if (readPartitions.isEmpty()) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
@@ -210,24 +212,24 @@ public class IncrementalInputSplits implements
Serializable {
// reading from the earliest, scans the partitions and files directly.
FileIndex fileIndex = getFileIndex(metaClient);
readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths());
- if (readPartitions.size() == 0) {
+ if (readPartitions.isEmpty()) {
LOG.warn("No partitions found for reading in user provided path.");
return Result.EMPTY;
}
fileInfoList = fileIndex.getFilesInPartitions();
+ List<FileSlice> allFileSlices = getFileSlices(metaClient,
commitTimeline, readPartitions, fileInfoList,
analyzingResult.getMaxCompletionTime(), false);
+ fileSlices = fileIndex.filterFileSlices(allFileSlices);
} else {
- fileInfoList = files;
+ fileSlices = getFileSlices(metaClient, commitTimeline, readPartitions,
files, analyzingResult.getMaxCompletionTime(), false);
}
}
- if (fileInfoList.size() == 0) {
+ if (fileSlices.isEmpty()) {
LOG.warn("No files found for reading in user provided path.");
return Result.EMPTY;
}
- List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient,
commitTimeline,
- fileInfoList, readPartitions, endInstant,
analyzingResult.getMaxCompletionTime(), instantRange, false);
-
+ List<MergeOnReadInputSplit> inputSplits = getInputSplits(fileSlices,
metaClient, endInstant, instantRange);
return Result.instance(inputSplits, endInstant);
}
@@ -278,19 +280,20 @@ public class IncrementalInputSplits implements
Serializable {
FileIndex fileIndex = getFileIndex(metaClient);
Set<String> readPartitions = new
TreeSet<>(fileIndex.getOrBuildPartitionPaths());
- if (readPartitions.size() == 0) {
+ if (readPartitions.isEmpty()) {
LOG.warn("No partitions found for reading under path: {}", path);
return Result.EMPTY;
}
List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
- if (pathInfoList.size() == 0) {
+ if (pathInfoList.isEmpty()) {
LOG.warn("No files found for reading under path: {}", path);
return Result.EMPTY;
}
+ List<FileSlice> allFileSlices = getFileSlices(metaClient,
commitTimeline, readPartitions, pathInfoList, offsetToIssue, false);
+ List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
- List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient,
commitTimeline,
- pathInfoList, readPartitions, endInstant, offsetToIssue, null,
false);
+ List<MergeOnReadInputSplit> inputSplits = getInputSplits(fileSlices,
metaClient, endInstant, null);
return Result.instance(inputSplits, endInstant, offsetToIssue);
} else {
@@ -334,53 +337,56 @@ public class IncrementalInputSplits implements
Serializable {
List<HoodieCommitMetadata> metadataList = mergeList(archivedMetadataList,
activeMetadataList);
Set<String> readPartitions = getReadPartitions(metadataList);
- if (readPartitions.size() == 0) {
+ if (readPartitions.isEmpty()) {
LOG.warn("No partitions found for reading under path: {}", path);
return Collections.emptyList();
}
List<StoragePathInfo> pathInfoList = WriteProfiles.getFilesFromMetadata(
path, hadoopConf, metadataList, metaClient.getTableType());
+ List<FileSlice> fileSlices = getFileSlices(metaClient, commitTimeline,
readPartitions, pathInfoList, queryContext.getMaxCompletionTime(),
skipCompaction);
- if (pathInfoList.size() == 0) {
+ if (fileSlices.isEmpty()) {
LOG.warn("No files found for reading under path: {}", path);
return Collections.emptyList();
}
- return getInputSplits(metaClient, commitTimeline,
- pathInfoList, readPartitions, endInstant,
queryContext.getMaxCompletionTime(), instantRange, skipCompaction);
+ return getInputSplits(fileSlices, metaClient, endInstant, instantRange);
}
- private List<MergeOnReadInputSplit> getInputSplits(
+ private List<FileSlice> getFileSlices(
HoodieTableMetaClient metaClient,
HoodieTimeline commitTimeline,
- List<StoragePathInfo> pathInfoList,
Set<String> readPartitions,
- String endInstant,
+ List<StoragePathInfo> pathInfoList,
String maxCompletionTime,
- InstantRange instantRange,
boolean skipBaseFiles) {
- final HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, commitTimeline, pathInfoList);
+ try (HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, commitTimeline, pathInfoList)) {
+ return readPartitions.stream().flatMap(par -> skipBaseFiles ?
fsView.getAllLogsMergedFileSliceBeforeOrOn(par, maxCompletionTime)
+ : fsView.getLatestMergedFileSlicesBeforeOrOn(par,
maxCompletionTime)).collect(Collectors.toList());
+ }
+ }
+
+ private List<MergeOnReadInputSplit> getInputSplits(
+ List<FileSlice> fileSlices,
+ HoodieTableMetaClient metaClient,
+ String endInstant,
+ InstantRange instantRange) {
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE);
- return readPartitions.stream()
- .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath,
maxCompletionTime, skipBaseFiles)
- .map(fileSlice -> {
- Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- .filter(logPath ->
!logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
- .collect(Collectors.toList()));
- String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- // the latest commit is used as the limit of the log reader
instant upper threshold,
- // it must be at least the latest instant time of the file slice
to avoid data loss.
- String latestCommit =
InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant);
- return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- basePath, logPaths, latestCommit,
- metaClient.getBasePath().toString(),
maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
- }).collect(Collectors.toList()))
- .flatMap(Collection::stream)
- .sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit))
- .collect(Collectors.toList());
+ return fileSlices.stream().map(fileSlice -> {
+ Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ .filter(logPath ->
!logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+ .collect(Collectors.toList()));
+ String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ // the latest commit is used as the limit of the log reader instant
upper threshold,
+ // it must be at least the latest instant time of the file slice to
avoid data loss.
+ String latestCommit =
InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant);
+ return new MergeOnReadInputSplit(cnt.getAndAdd(1),
+ basePath, logPaths, latestCommit,
+ metaClient.getBasePath().toString(), maxCompactionMemoryInBytes,
mergeType, instantRange, fileSlice.getFileId());
+
}).sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit)).collect(Collectors.toList());
}
private List<MergeOnReadInputSplit> getCdcInputSplits(
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index c6c1b3d2b871..2fb852d90ed4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -23,8 +23,9 @@ import
org.apache.hudi.adapter.InputFormatSourceFunctionAdapter;
import org.apache.hudi.adapter.TableFunctionProviderAdapter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -111,7 +112,6 @@ import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -397,30 +397,32 @@ public class HoodieTableSource implements
throw new HoodieException("No files found for reading in user provided
path.");
}
- HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
- // file-slice after pending compaction-requested instant-time is also
considered valid
-
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
- pathInfoList);
- if (!fsView.getLastInstant().isPresent()) {
- return Collections.emptyList();
+ String latestCommit;
+ List<FileSlice> allFileSlices;
+ // file-slice after pending compaction-requested instant-time is also
considered valid
+ try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
+ metaClient,
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
pathInfoList)) {
+ if (!fsView.getLastInstant().isPresent()) {
+ return Collections.emptyList();
+ }
+ latestCommit = fsView.getLastInstant().get().requestedTime();
+ allFileSlices = relPartitionPaths.stream()
+ .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par,
latestCommit)).collect(Collectors.toList());
}
- String latestCommit = fsView.getLastInstant().get().requestedTime();
+ List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
+
final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE);
final AtomicInteger cnt = new AtomicInteger(0);
// generates one input split for each file group
- return relPartitionPaths.stream()
- .map(relPartitionPath ->
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
- .map(fileSlice -> {
- String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- .collect(Collectors.toList()));
- return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath,
logPaths, latestCommit,
- metaClient.getBasePath().toString(),
maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId());
- }).collect(Collectors.toList()))
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ return fileSlices.stream().map(fileSlice -> {
+ String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ .collect(Collectors.toList()));
+ return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths,
latestCommit,
+ metaClient.getBasePath().toString(), maxCompactionMemoryInBytes,
mergeType, null, fileSlice.getFileId());
+ }).collect(Collectors.toList());
}
public InputFormat<RowData, ?> getInputFormat() {
@@ -574,15 +576,12 @@ public class HoodieTableSource implements
}
private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
- final List<StoragePathInfo> pathInfoList = getReadFiles();
- if (pathInfoList.isEmpty()) {
+ final List<FileSlice> fileSlices = getBaseFileOnlyFileSlices();
+ if (fileSlices.isEmpty()) {
return InputFormats.EMPTY_INPUT_FORMAT;
}
- HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
-
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(),
pathInfoList);
- Path[] paths = fsView.getLatestBaseFiles()
- .map(HoodieBaseFile::getPathInfo)
+ Path[] paths = fileSlices.stream().map(fileSlice ->
fileSlice.getBaseFile().get().getPathInfo())
.map(e -> new Path(e.getPath().toUri())).toArray(Path[]::new);
if (paths.length == 0) {
@@ -679,12 +678,21 @@ public class HoodieTableSource implements
* Get the reader paths with partition path expanded.
*/
@VisibleForTesting
- public List<StoragePathInfo> getReadFiles() {
+ public List<FileSlice> getBaseFileOnlyFileSlices() {
List<String> relPartitionPaths = getReadPartitions();
if (relPartitionPaths.isEmpty()) {
return Collections.emptyList();
}
- return fileIndex.getFilesInPartitions();
+ List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
+ try (HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
+
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
pathInfoList)) {
+
+ List<FileSlice> allFileSlices = relPartitionPaths.stream()
+ .flatMap(par -> fsView.getLatestBaseFiles(par)
+ .map(baseFile -> new FileSlice(new HoodieFileGroupId(par,
baseFile.getFileId()), baseFile.getCommitTime(), baseFile,
Collections.emptyList())))
+ .collect(Collectors.toList());
+ return fileIndex.filterFileSlices(allFileSlices);
+ }
}
@VisibleForTesting
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index cea36070557f..4d2aabd90fc0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -603,10 +604,20 @@ public class HoodieCatalog extends AbstractCatalog {
private void refreshTableProperties(ObjectPath tablePath, CatalogBaseTable
newCatalogTable) {
Map<String, String> options = newCatalogTable.getOptions();
+ ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable)
newCatalogTable;
final String avroSchema = AvroSchemaConverter.convertToSchema(
- ((ResolvedCatalogTable)
newCatalogTable).getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
+
resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema);
+ java.util.Optional<UniqueConstraint> pkConstraintOpt =
resolvedTable.getResolvedSchema().getPrimaryKey();
+ if (pkConstraintOpt.isPresent()) {
+ options.put(TableOptionProperties.PK_COLUMNS, String.join(",",
pkConstraintOpt.get().getColumns()));
+ options.put(TableOptionProperties.PK_CONSTRAINT_NAME,
pkConstraintOpt.get().getName());
+ }
+ if (resolvedTable.isPartitioned()) {
+ final String partitions = String.join(",",
resolvedTable.getPartitionKeys());
+ options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
+ }
String tablePathStr = inferTablePath(catalogPathStr, tablePath);
try {
TableOptionProperties.overwriteProperties(tablePathStr, hadoopConf,
options);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index a50f4a2a5243..fab10d7fd74c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -19,8 +19,11 @@
package org.apache.hudi.source;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.source.prune.ColumnStatsProbe;
@@ -82,8 +85,9 @@ public class TestFileIndex {
conf.set(METADATA_ENABLED, true);
conf.set(HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
FileIndex fileIndex = FileIndex.builder().path(new
StoragePath(tempFile.getAbsolutePath())).conf(conf)
-
.rowType(TestConfigurations.ROW_TYPE).metaClient(StreamerUtil.createMetaClient(conf)).build();
+ .rowType(TestConfigurations.ROW_TYPE).metaClient(metaClient).build();
List<String> partitionKeys = Collections.singletonList("partition");
List<Map<String, String>> partitions =
fileIndex.getPartitions(partitionKeys,
PARTITION_DEFAULT_NAME.defaultValue(),
@@ -93,10 +97,9 @@ public class TestFileIndex {
.map(Map::values).flatMap(Collection::stream).sorted().collect(Collectors.joining(","));
assertThat("should have 4 partitions", partitionPaths,
is("par1,par2,par3,par4"));
- List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
- assertThat(pathInfoList.size(), is(4));
- assertTrue(pathInfoList.stream().allMatch(fileInfo ->
-
fileInfo.getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())));
+ List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+ assertThat(fileSlices.size(), is(4));
+ assertTrue(fileSlices.stream().allMatch(fileSlice ->
fileSlice.getBaseFile().isPresent() && fileSlice.getLogFiles().count() == 0));
}
@Test
@@ -106,17 +109,18 @@ public class TestFileIndex {
conf.set(KEYGEN_CLASS_NAME,
NonpartitionedAvroKeyGenerator.class.getName());
conf.set(METADATA_ENABLED, true);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
FileIndex fileIndex = FileIndex.builder().path(new
StoragePath(tempFile.getAbsolutePath())).conf(conf)
-
.rowType(TestConfigurations.ROW_TYPE).metaClient(StreamerUtil.createMetaClient(conf)).build();
+ .rowType(TestConfigurations.ROW_TYPE).metaClient(metaClient).build();
List<String> partitionKeys = Collections.singletonList("");
List<Map<String, String>> partitions =
fileIndex.getPartitions(partitionKeys,
PARTITION_DEFAULT_NAME.defaultValue(), false);
assertThat(partitions.size(), is(0));
- List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
- assertThat(pathInfoList.size(), is(1));
- assertTrue(pathInfoList.get(0).getPath().toString()
- .endsWith(HoodieFileFormat.PARQUET.getFileExtension()));
+ List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+
+ assertThat(fileSlices.size(), is(1));
+ assertTrue(fileSlices.get(0).getBaseFile().isPresent() &&
fileSlices.get(0).getLogFiles().count() == 0);
}
@ParameterizedTest
@@ -130,9 +134,6 @@ public class TestFileIndex {
List<Map<String, String>> partitions =
fileIndex.getPartitions(partitionKeys,
PARTITION_DEFAULT_NAME.defaultValue(), false);
assertThat(partitions.size(), is(0));
-
- List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
- assertThat(pathInfoList.size(), is(0));
}
@Test
@@ -145,12 +146,13 @@ public class TestFileIndex {
writeBigintDataset(conf);
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
FileIndex fileIndex =
FileIndex.builder()
.path(new StoragePath(tempFile.getAbsolutePath()))
.conf(conf)
.rowType(TestConfigurations.ROW_TYPE_BIGINT)
- .metaClient(StreamerUtil.createMetaClient(conf))
+ .metaClient(metaClient)
.columnStatsProbe(ColumnStatsProbe.newInstance(Collections.singletonList(CallExpression.permanent(
FunctionIdentifier.of("greaterThan"),
BuiltInFunctionDefinitions.GREATER_THAN,
@@ -162,8 +164,8 @@ public class TestFileIndex {
.partitionPruner(null)
.build();
- List<StoragePathInfo> files = fileIndex.getFilesInPartitions();
- assertThat(files.size(), is(2));
+ List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+ assertThat(fileSlices.size(), is(2));
}
@ParameterizedTest
@@ -215,6 +217,22 @@ public class TestFileIndex {
assertEquals(Arrays.asList("par3"), p);
}
+ private List<FileSlice> getFilteredFileSlices(HoodieTableMetaClient
metaClient, FileIndex fileIndex) {
+ List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
+ List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
+ HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
+ List<FileSlice> allFileSlices;
+ try (HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, timeline, pathInfoList)) {
+ if (timeline.lastInstant().isPresent()) {
+ allFileSlices = relPartitionPaths.stream()
+ .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par,
timeline.lastInstant().get().requestedTime())).collect(Collectors.toList());
+ } else {
+ return Collections.emptyList();
+ }
+ }
+ return fileIndex.filterFileSlices(allFileSlices);
+ }
+
private void writeBigintDataset(Configuration conf) throws Exception {
List<RowData> dataset = Arrays.asList(
insertRow(TestConfigurations.ROW_TYPE_BIGINT, 1L,
StringData.fromString("Danny"), 23,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index c89148173b10..b59b51c035c0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1927,6 +1927,54 @@ public class ITTestHoodieDataSource {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testDataSkippingByFilteringFileSlice(HoodieTableType tableType) {
+ // Case: column for different files inside one file slice can be different,
+ // so if any file in the file slice satisfy the predicate based on column
stats,
+ // then the file slice should be read.
+ // E.g., query predicate is age <> '25', base file contains: {key=k1,
orderingVal=1, age=23},
+ // log file contains: {key=k1, orderingVal=2, age=25}, then the file slice
should be read.
+ TableEnvironment tableEnv = batchTableEnv;
+ String path = tempFile.getAbsolutePath();
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, path)
+ .option(FlinkOptions.METADATA_ENABLED, true)
+ .option("hoodie.metadata.index.column.stats.enable", true)
+ .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
+ .option(FlinkOptions.COMPACTION_TASKS, 1)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+ List<Row> result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where age <> 25 and
`partition` = 'par1'").execute().collect());
+ assertRowsEquals(result1, "["
+ + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
+ + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1]]");
+
+ batchTableEnv.executeSql("drop table t1");
+
+ hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, path)
+ .option(FlinkOptions.METADATA_ENABLED, true)
+ .option("hoodie.metadata.index.column.stats.enable", true)
+ .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ final String INSERT_T2 = "insert into t1 values\n"
+ + "('id1','Danny',25,TIMESTAMP '1970-01-01 00:01:01','par1')\n";
+ execInsertSql(tableEnv, INSERT_T2);
+ result1 = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1 where age <> 25 and
`partition` = 'par1'").execute().collect());
+ assertRowsEquals(result1, "[+I[id2, Stephen, 33, 1970-01-01T00:00:02,
par1]]");
+ }
+
@Test
void testParquetLogBlockDataSkipping() {
TableEnvironment tableEnv = batchTableEnv;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 6d35ac1f1adf..c2c36bec4fd1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -19,13 +19,13 @@
package org.apache.hudi.table;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
@@ -100,9 +100,9 @@ public class TestHoodieTableSource {
void testGetReadPaths() throws Exception {
beforeEach();
HoodieTableSource tableSource = getEmptyStreamingSource();
- List<StoragePathInfo> fileList = tableSource.getReadFiles();
- assertNotNull(fileList);
- assertThat(fileList.size(), is(4));
+ List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices();
+ assertNotNull(fileSlices);
+ assertThat(fileSlices.size(), is(4));
// apply partition pruning
FieldReferenceExpression partRef = new
FieldReferenceExpression("partition", DataTypes.STRING(), 4, 4);
ValueLiteralExpression partLiteral = new ValueLiteralExpression("par1",
DataTypes.STRING().notNull());
@@ -113,9 +113,9 @@ public class TestHoodieTableSource {
HoodieTableSource tableSource2 = getEmptyStreamingSource();
tableSource2.applyFilters(Arrays.asList(partFilter));
- List<StoragePathInfo> fileList2 = tableSource2.getReadFiles();
- assertNotNull(fileList2);
- assertThat(fileList2.size(), is(1));
+ List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices();
+ assertNotNull(fileSlices2);
+ assertThat(fileSlices2.size(), is(1));
}
@Test
@@ -203,8 +203,8 @@ public class TestHoodieTableSource {
int numBuckets = (int)FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.defaultValue();
assertThat(tableSource1.getDataBucketFunc().get().apply(numBuckets),
is(1));
- List<StoragePathInfo> fileList = tableSource1.getReadFiles();
- assertThat("Files should be pruned by bucket id 1", fileList.size(),
CoreMatchers.is(2));
+ List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned by bucket id 1", fileSlices.size(),
CoreMatchers.is(2));
// test multiple primary keys filtering
Configuration conf2 = conf1.clone();
@@ -218,8 +218,8 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(),
"id1"),
createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(),
"Danny")));
assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets),
is(3));
- List<StoragePathInfo> fileList2 = tableSource2.getReadFiles();
- assertThat("Files should be pruned by bucket id 3", fileList2.size(),
CoreMatchers.is(3));
+ List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned by bucket id 3", fileSlices2.size(),
CoreMatchers.is(3));
// apply the filters in different order and test again.
tableSource2.reset();
@@ -227,7 +227,7 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(),
"Danny"),
createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(),
"id1")));
assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets),
is(3));
- assertThat("Files should be pruned by bucket id 3",
tableSource2.getReadFiles().size(),
+ assertThat("Files should be pruned by bucket id 3",
tableSource2.getBaseFileOnlyFileSlices().size(),
CoreMatchers.is(3));
// test partial primary keys filtering
@@ -242,8 +242,8 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(),
"id1")));
assertTrue(tableSource3.getDataBucketFunc().isEmpty());
- List<StoragePathInfo> fileList3 = tableSource3.getReadFiles();
- assertThat("Partial pk filtering does not prune any files",
fileList3.size(),
+ List<FileSlice> fileSlices3 = tableSource3.getBaseFileOnlyFileSlices();
+ assertThat("Partial pk filtering does not prune any files",
fileSlices3.size(),
CoreMatchers.is(7));
// test single primary keys filtering together with non-primary key
predicate
@@ -257,8 +257,8 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(),
"Danny")));
assertThat(tableSource4.getDataBucketFunc().get().apply(numBuckets),
is(1));
- List<StoragePathInfo> fileList4 = tableSource4.getReadFiles();
- assertThat("Files should be pruned by bucket id 1", fileList4.size(),
CoreMatchers.is(2));
+ List<FileSlice> fileSlices4 = tableSource4.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned by bucket id 1", fileSlices4.size(),
CoreMatchers.is(2));
}
@ParameterizedTest
@@ -282,8 +282,8 @@ public class TestHoodieTableSource {
LocalDateTime.ofInstant(Instant.ofEpochMilli(1),
ZoneId.of("UTC")))));
assertThat(tableSource1.getDataBucketFunc().get().apply(numBuckets),
is(logicalTimestamp ? 1 : 0));
- List<StoragePathInfo> fileList = tableSource1.getReadFiles();
- assertThat("Files should be pruned", fileList.size(), CoreMatchers.is(1));
+ List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned", fileSlices.size(),
CoreMatchers.is(1));
// test date filtering
Configuration conf2 = conf1.clone();
@@ -298,8 +298,8 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr(f2, 1, DataTypes.DATE().notNull(),
LocalDate.ofEpochDay(1))));
assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets),
is(1));
- List<StoragePathInfo> fileList2 = tableSource2.getReadFiles();
- assertThat("Files should be pruned", fileList2.size(), CoreMatchers.is(1));
+ List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned", fileSlices2.size(),
CoreMatchers.is(1));
// test decimal filtering
Configuration conf3 = conf1.clone();
@@ -315,8 +315,8 @@ public class TestHoodieTableSource {
new BigDecimal("1.11"))));
assertThat(tableSource3.getDataBucketFunc().get().apply(numBuckets),
is(0));
- List<StoragePathInfo> fileList3 = tableSource3.getReadFiles();
- assertThat("Files should be pruned", fileList3.size(), CoreMatchers.is(1));
+ List<FileSlice> fileSlices3 = tableSource3.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned", fileSlices3.size(),
CoreMatchers.is(1));
}
@Test
@@ -373,8 +373,8 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(),
"id1")));
assertThat(tableSource1.getDataBucketFunc().get().apply(4), is(1));
- List<StoragePathInfo> fileList = tableSource1.getReadFiles();
- assertThat("Files should be pruned by bucket id 1", fileList.size(),
CoreMatchers.is(2));
+ List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned by bucket id 1", fileSlices.size(),
CoreMatchers.is(2));
}
/**
@@ -403,8 +403,8 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(),
"id1"),
createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(),
"Danny")));
assertThat(tableSource.getDataBucketFunc().get().apply(4), is(3));
- List<StoragePathInfo> fileList = tableSource.getReadFiles();
- assertThat("Files should be pruned by bucket id 3", fileList.size(),
CoreMatchers.is(3));
+ List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned by bucket id 3", fileSlices.size(),
CoreMatchers.is(3));
}
/**
@@ -433,8 +433,8 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(),
"id1")));
assertTrue(tableSource.getDataBucketFunc().isEmpty());
- List<StoragePathInfo> fileList = tableSource.getReadFiles();
- assertThat("Partial pk filtering does not prune any files",
fileList.size(),
+ List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices();
+ assertThat("Partial pk filtering does not prune any files",
fileSlices.size(),
CoreMatchers.is(7));
}
@@ -463,8 +463,8 @@ public class TestHoodieTableSource {
createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(),
"Danny")));
assertThat(tableSource.getDataBucketFunc().get().apply(4), is(1));
- List<StoragePathInfo> fileList = tableSource.getReadFiles();
- assertThat("Files should be pruned by bucket id 1", fileList.size(),
CoreMatchers.is(2));
+ List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices();
+ assertThat("Files should be pruned by bucket id 1", fileSlices.size(),
CoreMatchers.is(2));
}
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 61f7710b8775..ffc6d97170f2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -60,6 +60,7 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.DefaultCatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -456,6 +457,30 @@ public class TestHoodieCatalog {
() -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE,
"non_exist"), false));
}
+ @Test
+ public void testAlterTable() throws Exception {
+ ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+ // create table
+ catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
+
+ DefaultCatalogTable oldTable = (DefaultCatalogTable)
catalog.getTable(tablePath);
+ // same as old table
+ ResolvedCatalogTable newTable = new ResolvedCatalogTable(
+ CatalogUtils.createCatalogTable(
+
Schema.newBuilder().fromResolvedSchema(CREATE_TABLE_SCHEMA).build(),
+ oldTable.getPartitionKeys(),
+ oldTable.getOptions(),
+ "test"),
+ CREATE_TABLE_SCHEMA
+ );
+ catalog.alterTable(tablePath, newTable, true);
+
+ // validate primary key property is not missing
+ DefaultCatalogTable table = (DefaultCatalogTable)
catalog.getTable(tablePath);
+ assertTrue(table.getUnresolvedSchema().getPrimaryKey().isPresent());
+ assertTrue(table.isPartitioned());
+ }
+
@Test
public void testDropPartition() throws Exception {
ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");