This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e6c8c508e4b87a4eb467f40e6bf4dd05c082d740 Author: Shuo Cheng <[email protected]> AuthorDate: Sat Nov 1 10:44:04 2025 +0800 fix: Fix file pruning based on column stats for flink reader (#14186) --- .../org/apache/hudi/common/model/FileSlice.java | 11 +++ .../apache/hudi/common/model/TestFileSlice.java | 79 +++++++++++++++++++ .../java/org/apache/hudi/source/FileIndex.java | 60 ++++++++------- .../apache/hudi/source/IncrementalInputSplits.java | 88 ++++++++++++---------- .../org/apache/hudi/table/HoodieTableSource.java | 68 +++++++++-------- .../apache/hudi/table/catalog/HoodieCatalog.java | 13 +++- .../java/org/apache/hudi/source/TestFileIndex.java | 52 ++++++++----- .../apache/hudi/table/ITTestHoodieDataSource.java | 48 ++++++++++++ .../apache/hudi/table/TestHoodieTableSource.java | 60 +++++++-------- .../hudi/table/catalog/TestHoodieCatalog.java | 25 ++++++ 10 files changed, 356 insertions(+), 148 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java index 21e9a93d4afc..5f4c87ddb59a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.table.timeline.InstantComparison; import org.apache.hudi.common.util.Option; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.TreeSet; @@ -140,6 +141,16 @@ public class FileSlice implements Serializable { return Option.fromJavaOptional(logFiles.stream().findFirst()); } + /** + * Return file names list of base file and log files. + */ + public List<String> getAllFileNames() { + List<String> fileList = new ArrayList<>(); + getBaseFile().ifPresent(hoodieBaseFile -> fileList.add(hoodieBaseFile.getFileName())); + getLogFiles().forEach(hoodieLogFile -> fileList.add(hoodieLogFile.getFileName())); + return fileList; + } + public long getTotalFileSize() { return getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L) + getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java index 8538d645c4fc..a09bfaebc9a2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java @@ -22,13 +22,23 @@ import org.apache.hudi.storage.StoragePath; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests {@link FileSlice}. */ public class TestFileSlice { + private static final String PARTITION_PATH = "test_partition"; + private static final String FILE_ID = "test_file_id"; + private static final String BASE_INSTANT = "001"; + @Test void testGetLatestInstantTime() { String baseInstant = "003"; @@ -48,4 +58,73 @@ public class TestFileSlice { private static String getLogFileName(String instantTime) { return ".fg1_" + instantTime + ".log.1_1-0-1"; } + + @Test + public void testGetAllFilesWithBaseFileOnly() { + // Create a FileSlice with only a base file and no log files + HoodieBaseFile baseFile = new HoodieBaseFile( + "file://" + PARTITION_PATH + "/test_base_file.parquet"); + FileSlice fileSlice = new FileSlice( + new HoodieFileGroupId(PARTITION_PATH, FILE_ID), + BASE_INSTANT, + baseFile, + Collections.emptyList() + ); + + List<String> allFiles = fileSlice.getAllFileNames(); + assertEquals(1, allFiles.size()); + assertTrue(allFiles.contains(baseFile.getFileName())); + } + + @Test + public void testGetAllFilesWithLogFilesOnly() { + // Create a FileSlice with no base file but with log files + // Log files must follow the proper naming convention: .{fileId}_{instant}.log.{version} + HoodieLogFile logFile1 = new HoodieLogFile(new StoragePath(PARTITION_PATH + "/." + FILE_ID + "_002.log.1")); + HoodieLogFile logFile2 = new HoodieLogFile(new StoragePath(PARTITION_PATH + "/." + FILE_ID + "_003.log.2")); + + FileSlice fileSlice = new FileSlice( + new HoodieFileGroupId(PARTITION_PATH, FILE_ID), + BASE_INSTANT, + null, // No base file + Arrays.asList(logFile1, logFile2) + ); + + List<String> allFiles = fileSlice.getAllFileNames(); + assertEquals(2, allFiles.size()); + assertTrue(allFiles.contains(logFile1.getFileName())); + assertTrue(allFiles.contains(logFile2.getFileName())); + } + + @Test + public void testGetAllFilesWithBaseFileAndLogFiles() { + // Create a FileSlice with both base file and log files + HoodieBaseFile baseFile = new HoodieBaseFile( + "file://" + PARTITION_PATH + "/test_base_file.parquet"); + // Log files must follow the proper naming convention: .{fileId}_{instant}.log.{version} + HoodieLogFile logFile1 = new HoodieLogFile(new StoragePath(PARTITION_PATH + "/." + FILE_ID + "_004.log.1")); + HoodieLogFile logFile2 = new HoodieLogFile(new StoragePath(PARTITION_PATH + "/." + FILE_ID + "_005.log.2")); + + FileSlice fileSlice = new FileSlice( + new HoodieFileGroupId(PARTITION_PATH, FILE_ID), + BASE_INSTANT, + baseFile, + Arrays.asList(logFile1, logFile2) + ); + + List<String> allFiles = fileSlice.getAllFileNames(); + assertEquals(3, allFiles.size()); + assertTrue(allFiles.contains(baseFile.getFileName())); + assertTrue(allFiles.contains(logFile1.getFileName())); + assertTrue(allFiles.contains(logFile2.getFileName())); + } + + @Test + public void testGetAllFilesEmptyFileSlice() { + // Test with an empty file slice (no files and no base file) + FileSlice fileSlice = new FileSlice(PARTITION_PATH, BASE_INSTANT, FILE_ID); + + List<String> allFiles = fileSlice.getAllFileNames(); + assertEquals(0, allFiles.size()); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index 4d6d89ebf1d6..adae1d8df614 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -21,6 +21,7 @@ package org.apache.hudi.source; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; @@ -148,7 +149,7 @@ public class FileIndex implements Serializable { } /** - * Returns all the file statuses under the table base path. + * Return all files in the filtered partitions. */ public List<StoragePathInfo> getFilesInPartitions() { if (!tableExists) { @@ -161,41 +162,42 @@ public class FileIndex implements Serializable { } Map<String, List<StoragePathInfo>> filesInPartitions = FSUtils.getFilesInPartitions( new HoodieFlinkEngineContext(hadoopConf), metaClient, metadataConfig, partitions); - int totalFilesNum = filesInPartitions.values().stream().mapToInt(List::size).sum(); - if (totalFilesNum < 1) { - // returns early for empty table. - return Collections.emptyList(); - } + return filesInPartitions.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + } - List<StoragePathInfo> allFiles; + /** + * Filter file slices by pruning based on bucket id and column stats. + */ + public List<FileSlice> filterFileSlices(List<FileSlice> fileSlices) { + List<FileSlice> filteredFileSlices; // bucket pruning if (this.partitionBucketIdFunc != null) { - allFiles = filesInPartitions.entrySet().stream().flatMap(entry -> { - String bucketIdStr = BucketIdentifier.bucketIdStr(partitionBucketIdFunc.apply(entry.getKey())); - return entry.getValue().stream().filter(fileInfo -> fileInfo.getPath().getName().contains(bucketIdStr)); + filteredFileSlices = fileSlices.stream().filter(fileSlice -> { + String bucketIdStr = BucketIdentifier.bucketIdStr(partitionBucketIdFunc.apply(fileSlice.getPartitionPath())); + return fileSlice.getFileGroupId().getFileId().contains(bucketIdStr); }).collect(Collectors.toList()); - logPruningMsg(totalFilesNum, allFiles.size(), "bucket pruning"); + logPruningMsg(fileSlices.size(), filteredFileSlices.size(), "bucket pruning"); } else { - allFiles = filesInPartitions.values().stream().flatMap(Collection::stream).collect(Collectors.toList()); + filteredFileSlices = fileSlices; } - - if (allFiles.isEmpty()) { - // returns early for empty table. - return allFiles; + if (filteredFileSlices.isEmpty()) { + return Collections.emptyList(); } - // data skipping - Set<String> candidateFiles = fileStatsIndex.computeCandidateFiles( - colStatsProbe, allFiles.stream().map(f -> f.getPath().getName()).collect(Collectors.toList())); + // data skipping based on column stats + List<String> allFiles = fileSlices.stream().map(FileSlice::getAllFileNames).flatMap(List::stream).collect(Collectors.toList()); + Set<String> candidateFiles = fileStatsIndex.computeCandidateFiles(colStatsProbe, allFiles); if (candidateFiles == null) { // no need to filter by col stats or error occurs. - return allFiles; + return filteredFileSlices; } - List<StoragePathInfo> results = allFiles.stream().parallel() - .filter(fileStatus -> candidateFiles.contains(fileStatus.getPath().getName())) - .collect(Collectors.toList()); - logPruningMsg(allFiles.size(), results.size(), "data skipping"); - return results; + List<FileSlice> result = filteredFileSlices.stream().filter(fileSlice -> { + // if any file in the file slice is part of candidate file names, we need to include the file slice. + // in other words, if all files in the file slice are not present in candidate file names, we can filter out the file slice. + return fileSlice.getAllFileNames().stream().anyMatch(candidateFiles::contains); + }).collect(Collectors.toList()); + logPruningMsg(filteredFileSlices.size(), result.size(), "column stats pruning"); + return result; } /** @@ -261,15 +263,15 @@ public class FileIndex implements Serializable { return false; } - private void logPruningMsg(int numTotalFiles, int numLeftFiles, String action) { + private void logPruningMsg(int numTotalFileSlices, int numLeftFileSlices, String action) { LOG.info("\n" + "------------------------------------------------------------\n" + "---------- action: {}\n" - + "---------- total files: {}\n" - + "---------- left files: {}\n" + + "---------- total file slices: {}\n" + + "---------- left file slices: {}\n" + "---------- skipping rate: {}\n" + "------------------------------------------------------------", - action, numTotalFiles, numLeftFiles, percentage(numTotalFiles, numLeftFiles)); + action, numTotalFileSlices, numLeftFileSlices, percentage(numTotalFileSlices, numLeftFileSlices)); } private static double percentage(double total, double left) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 4655ce760608..c7dd6dcdf9b2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -54,7 +54,6 @@ import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -175,15 +174,18 @@ public class IncrementalInputSplits implements Serializable { // 4. the end commit is archived Set<String> readPartitions; final List<StoragePathInfo> fileInfoList; + final List<FileSlice> fileSlices; if (fullTableScan) { // scans the partitions and files directly. FileIndex fileIndex = getFileIndex(metaClient); readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths()); - if (readPartitions.size() == 0) { + if (readPartitions.isEmpty()) { LOG.warn("No partitions found for reading in user provided path."); return Result.EMPTY; } fileInfoList = fileIndex.getFilesInPartitions(); + List<FileSlice> allFileSlices = getFileSlices(metaClient, commitTimeline, readPartitions, fileInfoList, analyzingResult.getMaxCompletionTime(), false); + fileSlices = fileIndex.filterFileSlices(allFileSlices); } else { if (cdcEnabled) { // case1: cdc change log enabled @@ -197,7 +199,7 @@ public class IncrementalInputSplits implements Serializable { .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)) .collect(Collectors.toList()); readPartitions = getReadPartitions(metadataList); - if (readPartitions.size() == 0) { + if (readPartitions.isEmpty()) { LOG.warn("No partitions found for reading in user provided path."); return Result.EMPTY; } @@ -210,24 +212,24 @@ public class IncrementalInputSplits implements Serializable { // reading from the earliest, scans the partitions and files directly. FileIndex fileIndex = getFileIndex(metaClient); readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths()); - if (readPartitions.size() == 0) { + if (readPartitions.isEmpty()) { LOG.warn("No partitions found for reading in user provided path."); return Result.EMPTY; } fileInfoList = fileIndex.getFilesInPartitions(); + List<FileSlice> allFileSlices = getFileSlices(metaClient, commitTimeline, readPartitions, fileInfoList, analyzingResult.getMaxCompletionTime(), false); + fileSlices = fileIndex.filterFileSlices(allFileSlices); } else { - fileInfoList = files; + fileSlices = getFileSlices(metaClient, commitTimeline, readPartitions, files, analyzingResult.getMaxCompletionTime(), false); } } - if (fileInfoList.size() == 0) { + if (fileSlices.isEmpty()) { LOG.warn("No files found for reading in user provided path."); return Result.EMPTY; } - List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline, - fileInfoList, readPartitions, endInstant, analyzingResult.getMaxCompletionTime(), instantRange, false); - + List<MergeOnReadInputSplit> inputSplits = getInputSplits(fileSlices, metaClient, endInstant, instantRange); return Result.instance(inputSplits, endInstant); } @@ -278,19 +280,20 @@ public class IncrementalInputSplits implements Serializable { FileIndex fileIndex = getFileIndex(metaClient); Set<String> readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths()); - if (readPartitions.size() == 0) { + if (readPartitions.isEmpty()) { LOG.warn("No partitions found for reading under path: {}", path); return Result.EMPTY; } List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions(); - if (pathInfoList.size() == 0) { + if (pathInfoList.isEmpty()) { LOG.warn("No files found for reading under path: {}", path); return Result.EMPTY; } + List<FileSlice> allFileSlices = getFileSlices(metaClient, commitTimeline, readPartitions, pathInfoList, offsetToIssue, false); + List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices); - List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, commitTimeline, - pathInfoList, readPartitions, endInstant, offsetToIssue, null, false); + List<MergeOnReadInputSplit> inputSplits = getInputSplits(fileSlices, metaClient, endInstant, null); return Result.instance(inputSplits, endInstant, offsetToIssue); } else { @@ -334,53 +337,56 @@ public class IncrementalInputSplits implements Serializable { List<HoodieCommitMetadata> metadataList = mergeList(archivedMetadataList, activeMetadataList); Set<String> readPartitions = getReadPartitions(metadataList); - if (readPartitions.size() == 0) { + if (readPartitions.isEmpty()) { LOG.warn("No partitions found for reading under path: {}", path); return Collections.emptyList(); } List<StoragePathInfo> pathInfoList = WriteProfiles.getFilesFromMetadata( path, hadoopConf, metadataList, metaClient.getTableType()); + List<FileSlice> fileSlices = getFileSlices(metaClient, commitTimeline, readPartitions, pathInfoList, queryContext.getMaxCompletionTime(), skipCompaction); - if (pathInfoList.size() == 0) { + if (fileSlices.isEmpty()) { LOG.warn("No files found for reading under path: {}", path); return Collections.emptyList(); } - return getInputSplits(metaClient, commitTimeline, - pathInfoList, readPartitions, endInstant, queryContext.getMaxCompletionTime(), instantRange, skipCompaction); + return getInputSplits(fileSlices, metaClient, endInstant, instantRange); } - private List<MergeOnReadInputSplit> getInputSplits( + private List<FileSlice> getFileSlices( HoodieTableMetaClient metaClient, HoodieTimeline commitTimeline, - List<StoragePathInfo> pathInfoList, Set<String> readPartitions, - String endInstant, + List<StoragePathInfo> pathInfoList, String maxCompletionTime, - InstantRange instantRange, boolean skipBaseFiles) { - final HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, pathInfoList); + try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, commitTimeline, pathInfoList)) { + return readPartitions.stream().flatMap(par -> skipBaseFiles ? fsView.getAllLogsMergedFileSliceBeforeOrOn(par, maxCompletionTime) + : fsView.getLatestMergedFileSlicesBeforeOrOn(par, maxCompletionTime)).collect(Collectors.toList()); + } + } + + private List<MergeOnReadInputSplit> getInputSplits( + List<FileSlice> fileSlices, + HoodieTableMetaClient metaClient, + String endInstant, + InstantRange instantRange) { final AtomicInteger cnt = new AtomicInteger(0); final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE); - return readPartitions.stream() - .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, maxCompletionTime, skipBaseFiles) - .map(fileSlice -> { - Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles() - .sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()) - .filter(logPath -> !logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX)) - .collect(Collectors.toList())); - String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); - // the latest commit is used as the limit of the log reader instant upper threshold, - // it must be at least the latest instant time of the file slice to avoid data loss. - String latestCommit = InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant); - return new MergeOnReadInputSplit(cnt.getAndAdd(1), - basePath, logPaths, latestCommit, - metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId()); - }).collect(Collectors.toList())) - .flatMap(Collection::stream) - .sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit)) - .collect(Collectors.toList()); + return fileSlices.stream().map(fileSlice -> { + Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()) + .filter(logPath -> !logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX)) + .collect(Collectors.toList())); + String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); + // the latest commit is used as the limit of the log reader instant upper threshold, + // it must be at least the latest instant time of the file slice to avoid data loss. + String latestCommit = InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant); + return new MergeOnReadInputSplit(cnt.getAndAdd(1), + basePath, logPaths, latestCommit, + metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId()); + }).sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit)).collect(Collectors.toList()); } private List<MergeOnReadInputSplit> getCdcInputSplits( diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index c6c1b3d2b871..2fb852d90ed4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -23,8 +23,9 @@ import org.apache.hudi.adapter.InputFormatSourceFunctionAdapter; import org.apache.hudi.adapter.TableFunctionProviderAdapter; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.BaseFile; -import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -111,7 +112,6 @@ import java.io.Serializable; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; @@ -397,30 +397,32 @@ public class HoodieTableSource implements throw new HoodieException("No files found for reading in user provided path."); } - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - // file-slice after pending compaction-requested instant-time is also considered valid - metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), - pathInfoList); - if (!fsView.getLastInstant().isPresent()) { - return Collections.emptyList(); + String latestCommit; + List<FileSlice> allFileSlices; + // file-slice after pending compaction-requested instant-time is also considered valid + try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView( + metaClient, metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), pathInfoList)) { + if (!fsView.getLastInstant().isPresent()) { + return Collections.emptyList(); + } + latestCommit = fsView.getLastInstant().get().requestedTime(); + allFileSlices = relPartitionPaths.stream() + .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par, latestCommit)).collect(Collectors.toList()); } - String latestCommit = fsView.getLastInstant().get().requestedTime(); + List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices); + final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE); final AtomicInteger cnt = new AtomicInteger(0); // generates one input split for each file group - return relPartitionPaths.stream() - .map(relPartitionPath -> fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit) - .map(fileSlice -> { - String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); - Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles() - .sorted(HoodieLogFile.getLogFileComparator()) - .map(logFile -> logFile.getPath().toString()) - .collect(Collectors.toList())); - return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, - metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId()); - }).collect(Collectors.toList())) - .flatMap(Collection::stream) - .collect(Collectors.toList()); + return fileSlices.stream().map(fileSlice -> { + String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null); + Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(logFile -> logFile.getPath().toString()) + .collect(Collectors.toList())); + return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit, + metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId()); + }).collect(Collectors.toList()); } public InputFormat<RowData, ?> getInputFormat() { @@ -574,15 +576,12 @@ public class HoodieTableSource implements } private InputFormat<RowData, ?> baseFileOnlyInputFormat() { - final List<StoragePathInfo> pathInfoList = getReadFiles(); - if (pathInfoList.isEmpty()) { + final List<FileSlice> fileSlices = getBaseFileOnlyFileSlices(); + if (fileSlices.isEmpty()) { return InputFormats.EMPTY_INPUT_FORMAT; } - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), pathInfoList); - Path[] paths = fsView.getLatestBaseFiles() - .map(HoodieBaseFile::getPathInfo) + Path[] paths = fileSlices.stream().map(fileSlice -> fileSlice.getBaseFile().get().getPathInfo()) .map(e -> new Path(e.getPath().toUri())).toArray(Path[]::new); if (paths.length == 0) { @@ -679,12 +678,21 @@ public class HoodieTableSource implements * Get the reader paths with partition path expanded. */ @VisibleForTesting - public List<StoragePathInfo> getReadFiles() { + public List<FileSlice> getBaseFileOnlyFileSlices() { List<String> relPartitionPaths = getReadPartitions(); if (relPartitionPaths.isEmpty()) { return Collections.emptyList(); } - return fileIndex.getFilesInPartitions(); + List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions(); + try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(), pathInfoList)) { + + List<FileSlice> allFileSlices = relPartitionPaths.stream() + .flatMap(par -> fsView.getLatestBaseFiles(par) + .map(baseFile -> new FileSlice(new HoodieFileGroupId(par, baseFile.getFileId()), baseFile.getCommitTime(), baseFile, Collections.emptyList()))) + .collect(Collectors.toList()); + return fileIndex.filterFileSlices(allFileSlices); + } } @VisibleForTesting diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java index cea36070557f..4d2aabd90fc0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -49,6 +49,7 @@ import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -603,10 +604,20 @@ public class HoodieCatalog extends AbstractCatalog { private void refreshTableProperties(ObjectPath tablePath, CatalogBaseTable newCatalogTable) { Map<String, String> options = newCatalogTable.getOptions(); + ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) newCatalogTable; final String avroSchema = AvroSchemaConverter.convertToSchema( - ((ResolvedCatalogTable) newCatalogTable).getResolvedSchema().toPhysicalRowDataType().getLogicalType(), + resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString(); options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema); + java.util.Optional<UniqueConstraint> pkConstraintOpt = resolvedTable.getResolvedSchema().getPrimaryKey(); + if (pkConstraintOpt.isPresent()) { + options.put(TableOptionProperties.PK_COLUMNS, String.join(",", pkConstraintOpt.get().getColumns())); + options.put(TableOptionProperties.PK_CONSTRAINT_NAME, pkConstraintOpt.get().getName()); + } + if (resolvedTable.isPartitioned()) { + final String partitions = String.join(",", resolvedTable.getPartitionKeys()); + options.put(TableOptionProperties.PARTITION_COLUMNS, partitions); + } String tablePathStr = inferTablePath(catalogPathStr, tablePath); try { TableOptionProperties.overwriteProperties(tablePathStr, hadoopConf, options); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java index a50f4a2a5243..fab10d7fd74c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java @@ -19,8 +19,11 @@ package org.apache.hudi.source; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.source.prune.ColumnStatsProbe; @@ -82,8 +85,9 @@ public class TestFileIndex { conf.set(METADATA_ENABLED, true); conf.set(HIVE_STYLE_PARTITIONING, hiveStylePartitioning); TestData.writeData(TestData.DATA_SET_INSERT, conf); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); FileIndex fileIndex = FileIndex.builder().path(new StoragePath(tempFile.getAbsolutePath())).conf(conf) - .rowType(TestConfigurations.ROW_TYPE).metaClient(StreamerUtil.createMetaClient(conf)).build(); + .rowType(TestConfigurations.ROW_TYPE).metaClient(metaClient).build(); List<String> partitionKeys = Collections.singletonList("partition"); List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, PARTITION_DEFAULT_NAME.defaultValue(), @@ -93,10 +97,9 @@ public class TestFileIndex { .map(Map::values).flatMap(Collection::stream).sorted().collect(Collectors.joining(",")); assertThat("should have 4 partitions", partitionPaths, is("par1,par2,par3,par4")); - List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions(); - assertThat(pathInfoList.size(), is(4)); - assertTrue(pathInfoList.stream().allMatch(fileInfo -> - fileInfo.getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))); + List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex); + assertThat(fileSlices.size(), is(4)); + assertTrue(fileSlices.stream().allMatch(fileSlice -> fileSlice.getBaseFile().isPresent() && fileSlice.getLogFiles().count() == 0)); } @Test @@ -106,17 +109,18 @@ public class TestFileIndex { conf.set(KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName()); conf.set(METADATA_ENABLED, true); TestData.writeData(TestData.DATA_SET_INSERT, conf); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); FileIndex fileIndex = FileIndex.builder().path(new StoragePath(tempFile.getAbsolutePath())).conf(conf) - .rowType(TestConfigurations.ROW_TYPE).metaClient(StreamerUtil.createMetaClient(conf)).build(); + .rowType(TestConfigurations.ROW_TYPE).metaClient(metaClient).build(); List<String> partitionKeys = Collections.singletonList(""); List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, PARTITION_DEFAULT_NAME.defaultValue(), false); assertThat(partitions.size(), is(0)); - List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions(); - assertThat(pathInfoList.size(), is(1)); - assertTrue(pathInfoList.get(0).getPath().toString() - .endsWith(HoodieFileFormat.PARQUET.getFileExtension())); + List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex); + + assertThat(fileSlices.size(), is(1)); + assertTrue(fileSlices.get(0).getBaseFile().isPresent() && fileSlices.get(0).getLogFiles().count() == 0); } @ParameterizedTest @@ -130,9 +134,6 @@ public class TestFileIndex { List<Map<String, String>> partitions = fileIndex.getPartitions(partitionKeys, PARTITION_DEFAULT_NAME.defaultValue(), false); assertThat(partitions.size(), is(0)); - - List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions(); - assertThat(pathInfoList.size(), is(0)); } @Test @@ -145,12 +146,13 @@ public class TestFileIndex { writeBigintDataset(conf); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf); FileIndex fileIndex = FileIndex.builder() .path(new StoragePath(tempFile.getAbsolutePath())) .conf(conf) .rowType(TestConfigurations.ROW_TYPE_BIGINT) - .metaClient(StreamerUtil.createMetaClient(conf)) + .metaClient(metaClient) .columnStatsProbe(ColumnStatsProbe.newInstance(Collections.singletonList(CallExpression.permanent( FunctionIdentifier.of("greaterThan"), BuiltInFunctionDefinitions.GREATER_THAN, @@ -162,8 +164,8 @@ public class TestFileIndex { .partitionPruner(null) .build(); - List<StoragePathInfo> files = fileIndex.getFilesInPartitions(); - assertThat(files.size(), is(2)); + List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex); + assertThat(fileSlices.size(), is(2)); } @ParameterizedTest @@ -215,6 +217,22 @@ public class TestFileIndex { assertEquals(Arrays.asList("par3"), p); } + private List<FileSlice> getFilteredFileSlices(HoodieTableMetaClient metaClient, FileIndex fileIndex) { + List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths(); + List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions(); + HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline(); + List<FileSlice> allFileSlices; + try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline, pathInfoList)) { + if (timeline.lastInstant().isPresent()) { + allFileSlices = relPartitionPaths.stream() + .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par, timeline.lastInstant().get().requestedTime())).collect(Collectors.toList()); + } else { + return Collections.emptyList(); + } + } + return fileIndex.filterFileSlices(allFileSlices); + } + private void writeBigintDataset(Configuration conf) throws Exception { List<RowData> dataset = Arrays.asList( insertRow(TestConfigurations.ROW_TYPE_BIGINT, 1L, StringData.fromString("Danny"), 23, diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index c89148173b10..b59b51c035c0 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1927,6 +1927,54 @@ public class ITTestHoodieDataSource { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testDataSkippingByFilteringFileSlice(HoodieTableType tableType) { + // Case: column for different files inside one file slice can be different, + // so if any file in the file slice satisfy the predicate based on column stats, + // then the file slice should be read. + // E.g., query predicate is age <> '25', base file contains: {key=k1, orderingVal=1, age=23}, + // log file contains: {key=k1, orderingVal=2, age=25}, then the file slice should be read. + TableEnvironment tableEnv = batchTableEnv; + String path = tempFile.getAbsolutePath(); + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, path) + .option(FlinkOptions.METADATA_ENABLED, true) + .option("hoodie.metadata.index.column.stats.enable", true) + .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1) + .option(FlinkOptions.COMPACTION_TASKS, 1) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + execInsertSql(tableEnv, TestSQL.INSERT_T1); + + List<Row> result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where age <> 25 and `partition` = 'par1'").execute().collect()); + assertRowsEquals(result1, "[" + + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], " + + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1]]"); + + batchTableEnv.executeSql("drop table t1"); + + hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, path) + .option(FlinkOptions.METADATA_ENABLED, true) + .option("hoodie.metadata.index.column.stats.enable", true) + .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true) + .option(FlinkOptions.TABLE_TYPE, tableType) + .end(); + tableEnv.executeSql(hoodieTableDDL); + + final String INSERT_T2 = "insert into t1 values\n" + + "('id1','Danny',25,TIMESTAMP '1970-01-01 00:01:01','par1')\n"; + execInsertSql(tableEnv, INSERT_T2); + result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1 where age <> 25 and `partition` = 'par1'").execute().collect()); + assertRowsEquals(result1, "[+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1]]"); + } + @Test void testParquetLogBlockDataSkipping() { TableEnvironment tableEnv = batchTableEnv; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index 6d35ac1f1adf..c2c36bec4fd1 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -19,13 +19,13 @@ package org.apache.hudi.table; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.source.ExpressionPredicates; import org.apache.hudi.source.prune.ColumnStatsProbe; import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.storage.StoragePathInfo; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.util.SerializableSchema; import org.apache.hudi.util.StreamerUtil; @@ -100,9 +100,9 @@ public class TestHoodieTableSource { void testGetReadPaths() throws Exception { beforeEach(); HoodieTableSource tableSource = getEmptyStreamingSource(); - List<StoragePathInfo> fileList = tableSource.getReadFiles(); - assertNotNull(fileList); - assertThat(fileList.size(), is(4)); + List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices(); + assertNotNull(fileSlices); + assertThat(fileSlices.size(), is(4)); // apply partition pruning FieldReferenceExpression partRef = new FieldReferenceExpression("partition", DataTypes.STRING(), 4, 4); ValueLiteralExpression partLiteral = new ValueLiteralExpression("par1", DataTypes.STRING().notNull()); @@ -113,9 +113,9 @@ public class TestHoodieTableSource { HoodieTableSource tableSource2 = getEmptyStreamingSource(); tableSource2.applyFilters(Arrays.asList(partFilter)); - List<StoragePathInfo> fileList2 = tableSource2.getReadFiles(); - assertNotNull(fileList2); - assertThat(fileList2.size(), is(1)); + List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices(); + assertNotNull(fileSlices2); + assertThat(fileSlices2.size(), is(1)); } @Test @@ -203,8 +203,8 @@ public class TestHoodieTableSource { int numBuckets = (int)FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.defaultValue(); assertThat(tableSource1.getDataBucketFunc().get().apply(numBuckets), is(1)); - List<StoragePathInfo> fileList = tableSource1.getReadFiles(); - assertThat("Files should be pruned by bucket id 1", fileList.size(), CoreMatchers.is(2)); + List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned by bucket id 1", fileSlices.size(), CoreMatchers.is(2)); // test multiple primary keys filtering Configuration conf2 = conf1.clone(); @@ -218,8 +218,8 @@ public class TestHoodieTableSource { createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"), createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), "Danny"))); assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets), is(3)); - List<StoragePathInfo> fileList2 = tableSource2.getReadFiles(); - assertThat("Files should be pruned by bucket id 3", fileList2.size(), CoreMatchers.is(3)); + List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned by bucket id 3", fileSlices2.size(), CoreMatchers.is(3)); // apply the filters in different order and test again. tableSource2.reset(); @@ -227,7 +227,7 @@ public class TestHoodieTableSource { createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), "Danny"), createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"))); assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets), is(3)); - assertThat("Files should be pruned by bucket id 3", tableSource2.getReadFiles().size(), + assertThat("Files should be pruned by bucket id 3", tableSource2.getBaseFileOnlyFileSlices().size(), CoreMatchers.is(3)); // test partial primary keys filtering @@ -242,8 +242,8 @@ public class TestHoodieTableSource { createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"))); assertTrue(tableSource3.getDataBucketFunc().isEmpty()); - List<StoragePathInfo> fileList3 = tableSource3.getReadFiles(); - assertThat("Partial pk filtering does not prune any files", fileList3.size(), + List<FileSlice> fileSlices3 = tableSource3.getBaseFileOnlyFileSlices(); + assertThat("Partial pk filtering does not prune any files", fileSlices3.size(), CoreMatchers.is(7)); // test single primary keys filtering together with non-primary key predicate @@ -257,8 +257,8 @@ public class TestHoodieTableSource { createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), "Danny"))); assertThat(tableSource4.getDataBucketFunc().get().apply(numBuckets), is(1)); - List<StoragePathInfo> fileList4 = tableSource4.getReadFiles(); - assertThat("Files should be pruned by bucket id 1", fileList4.size(), CoreMatchers.is(2)); + List<FileSlice> fileSlices4 = tableSource4.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned by bucket id 1", fileSlices4.size(), CoreMatchers.is(2)); } @ParameterizedTest @@ -282,8 +282,8 @@ public class TestHoodieTableSource { LocalDateTime.ofInstant(Instant.ofEpochMilli(1), ZoneId.of("UTC"))))); assertThat(tableSource1.getDataBucketFunc().get().apply(numBuckets), is(logicalTimestamp ? 1 : 0)); - List<StoragePathInfo> fileList = tableSource1.getReadFiles(); - assertThat("Files should be pruned", fileList.size(), CoreMatchers.is(1)); + List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned", fileSlices.size(), CoreMatchers.is(1)); // test date filtering Configuration conf2 = conf1.clone(); @@ -298,8 +298,8 @@ public class TestHoodieTableSource { createLitEquivalenceExpr(f2, 1, DataTypes.DATE().notNull(), LocalDate.ofEpochDay(1)))); assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets), is(1)); - List<StoragePathInfo> fileList2 = tableSource2.getReadFiles(); - assertThat("Files should be pruned", fileList2.size(), CoreMatchers.is(1)); + List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned", fileSlices2.size(), CoreMatchers.is(1)); // test decimal filtering Configuration conf3 = conf1.clone(); @@ -315,8 +315,8 @@ public class TestHoodieTableSource { new BigDecimal("1.11")))); assertThat(tableSource3.getDataBucketFunc().get().apply(numBuckets), is(0)); - List<StoragePathInfo> fileList3 = tableSource3.getReadFiles(); - assertThat("Files should be pruned", fileList3.size(), CoreMatchers.is(1)); + List<FileSlice> fileSlices3 = tableSource3.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned", fileSlices3.size(), CoreMatchers.is(1)); } @Test @@ -373,8 +373,8 @@ public class TestHoodieTableSource { createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"))); assertThat(tableSource1.getDataBucketFunc().get().apply(4), is(1)); - List<StoragePathInfo> fileList = tableSource1.getReadFiles(); - assertThat("Files should be pruned by bucket id 1", fileList.size(), CoreMatchers.is(2)); + List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned by bucket id 1", fileSlices.size(), CoreMatchers.is(2)); } /** @@ -403,8 +403,8 @@ public class TestHoodieTableSource { createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"), createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), "Danny"))); assertThat(tableSource.getDataBucketFunc().get().apply(4), is(3)); - List<StoragePathInfo> fileList = tableSource.getReadFiles(); - assertThat("Files should be pruned by bucket id 3", fileList.size(), CoreMatchers.is(3)); + List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned by bucket id 3", fileSlices.size(), CoreMatchers.is(3)); } /** @@ -433,8 +433,8 @@ public class TestHoodieTableSource { createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), "id1"))); assertTrue(tableSource.getDataBucketFunc().isEmpty()); - List<StoragePathInfo> fileList = tableSource.getReadFiles(); - assertThat("Partial pk filtering does not prune any files", fileList.size(), + List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices(); + assertThat("Partial pk filtering does not prune any files", fileSlices.size(), CoreMatchers.is(7)); } @@ -463,8 +463,8 @@ public class TestHoodieTableSource { createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), "Danny"))); assertThat(tableSource.getDataBucketFunc().get().apply(4), is(1)); - List<StoragePathInfo> fileList = tableSource.getReadFiles(); - assertThat("Files should be pruned by bucket id 1", fileList.size(), CoreMatchers.is(2)); + List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices(); + assertThat("Files should be pruned by bucket id 1", fileSlices.size(), CoreMatchers.is(2)); } // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java index 61f7710b8775..ffc6d97170f2 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java @@ -60,6 +60,7 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.DefaultCatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; @@ -456,6 +457,30 @@ public class TestHoodieCatalog { () -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE, "non_exist"), false)); } + @Test + public void testAlterTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + // create table + catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true); + + DefaultCatalogTable oldTable = (DefaultCatalogTable) catalog.getTable(tablePath); + // same as old table + ResolvedCatalogTable newTable = new ResolvedCatalogTable( + CatalogUtils.createCatalogTable( + Schema.newBuilder().fromResolvedSchema(CREATE_TABLE_SCHEMA).build(), + oldTable.getPartitionKeys(), + oldTable.getOptions(), + "test"), + CREATE_TABLE_SCHEMA + ); + catalog.alterTable(tablePath, newTable, true); + + // validate primary key property is not missing + DefaultCatalogTable table = (DefaultCatalogTable) catalog.getTable(tablePath); + assertTrue(table.getUnresolvedSchema().getPrimaryKey().isPresent()); + assertTrue(table.isPartitioned()); + } + @Test public void testDropPartition() throws Exception { ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
