This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e6c8c508e4b87a4eb467f40e6bf4dd05c082d740
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat Nov 1 10:44:04 2025 +0800

    fix: Fix file pruning based on column stats for flink reader (#14186)
---
 .../org/apache/hudi/common/model/FileSlice.java    | 11 +++
 .../apache/hudi/common/model/TestFileSlice.java    | 79 +++++++++++++++++++
 .../java/org/apache/hudi/source/FileIndex.java     | 60 ++++++++-------
 .../apache/hudi/source/IncrementalInputSplits.java | 88 ++++++++++++----------
 .../org/apache/hudi/table/HoodieTableSource.java   | 68 +++++++++--------
 .../apache/hudi/table/catalog/HoodieCatalog.java   | 13 +++-
 .../java/org/apache/hudi/source/TestFileIndex.java | 52 ++++++++-----
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 48 ++++++++++++
 .../apache/hudi/table/TestHoodieTableSource.java   | 60 +++++++--------
 .../hudi/table/catalog/TestHoodieCatalog.java      | 25 ++++++
 10 files changed, 356 insertions(+), 148 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
index 21e9a93d4afc..5f4c87ddb59a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FileSlice.java
@@ -22,6 +22,7 @@ import 
org.apache.hudi.common.table.timeline.InstantComparison;
 import org.apache.hudi.common.util.Option;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.TreeSet;
@@ -140,6 +141,16 @@ public class FileSlice implements Serializable {
     return Option.fromJavaOptional(logFiles.stream().findFirst());
   }
 
+  /**
+   * Return file names list of base file and log files.
+   */
+  public List<String> getAllFileNames() {
+    List<String> fileList = new ArrayList<>();
+    getBaseFile().ifPresent(hoodieBaseFile -> 
fileList.add(hoodieBaseFile.getFileName()));
+    getLogFiles().forEach(hoodieLogFile -> 
fileList.add(hoodieLogFile.getFileName()));
+    return fileList;
+  }
+
   public long getTotalFileSize() {
     return getBaseFile().map(HoodieBaseFile::getFileSize).orElse(0L)
         + getLogFiles().mapToLong(HoodieLogFile::getFileSize).sum();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
index 8538d645c4fc..a09bfaebc9a2 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFileSlice.java
@@ -22,13 +22,23 @@ import org.apache.hudi.storage.StoragePath;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests {@link FileSlice}.
  */
 public class TestFileSlice {
+  private static final String PARTITION_PATH = "test_partition";
+  private static final String FILE_ID = "test_file_id";
+  private static final String BASE_INSTANT = "001";
+
   @Test
   void testGetLatestInstantTime() {
     String baseInstant = "003";
@@ -48,4 +58,73 @@ public class TestFileSlice {
   private static String getLogFileName(String instantTime) {
     return ".fg1_" + instantTime + ".log.1_1-0-1";
   }
+
+  @Test
+  public void testGetAllFilesWithBaseFileOnly() {
+    // Create a FileSlice with only a base file and no log files
+    HoodieBaseFile baseFile = new HoodieBaseFile(
+        "file://" + PARTITION_PATH + "/test_base_file.parquet");
+    FileSlice fileSlice = new FileSlice(
+        new HoodieFileGroupId(PARTITION_PATH, FILE_ID),
+        BASE_INSTANT,
+        baseFile,
+        Collections.emptyList()
+    );
+
+    List<String> allFiles = fileSlice.getAllFileNames();
+    assertEquals(1, allFiles.size());
+    assertTrue(allFiles.contains(baseFile.getFileName()));
+  }
+
+  @Test
+  public void testGetAllFilesWithLogFilesOnly() {
+    // Create a FileSlice with no base file but with log files
+    // Log files must follow the proper naming convention: 
.{fileId}_{instant}.log.{version}
+    HoodieLogFile logFile1 = new HoodieLogFile(new StoragePath(PARTITION_PATH 
+ "/." + FILE_ID + "_002.log.1"));
+    HoodieLogFile logFile2 = new HoodieLogFile(new StoragePath(PARTITION_PATH 
+ "/." + FILE_ID + "_003.log.2"));
+
+    FileSlice fileSlice = new FileSlice(
+        new HoodieFileGroupId(PARTITION_PATH, FILE_ID),
+        BASE_INSTANT,
+        null, // No base file
+        Arrays.asList(logFile1, logFile2)
+    );
+
+    List<String> allFiles = fileSlice.getAllFileNames();
+    assertEquals(2, allFiles.size());
+    assertTrue(allFiles.contains(logFile1.getFileName()));
+    assertTrue(allFiles.contains(logFile2.getFileName()));
+  }
+
+  @Test
+  public void testGetAllFilesWithBaseFileAndLogFiles() {
+    // Create a FileSlice with both base file and log files
+    HoodieBaseFile baseFile = new HoodieBaseFile(
+        "file://" + PARTITION_PATH + "/test_base_file.parquet");
+    // Log files must follow the proper naming convention: 
.{fileId}_{instant}.log.{version}
+    HoodieLogFile logFile1 = new HoodieLogFile(new StoragePath(PARTITION_PATH 
+ "/." + FILE_ID + "_004.log.1"));
+    HoodieLogFile logFile2 = new HoodieLogFile(new StoragePath(PARTITION_PATH 
+ "/." + FILE_ID + "_005.log.2"));
+
+    FileSlice fileSlice = new FileSlice(
+        new HoodieFileGroupId(PARTITION_PATH, FILE_ID),
+        BASE_INSTANT,
+        baseFile,
+        Arrays.asList(logFile1, logFile2)
+    );
+
+    List<String> allFiles = fileSlice.getAllFileNames();
+    assertEquals(3, allFiles.size());
+    assertTrue(allFiles.contains(baseFile.getFileName()));
+    assertTrue(allFiles.contains(logFile1.getFileName()));
+    assertTrue(allFiles.contains(logFile2.getFileName()));
+  }
+
+  @Test
+  public void testGetAllFilesEmptyFileSlice() {
+    // Test with an empty file slice (no files and no base file)
+    FileSlice fileSlice = new FileSlice(PARTITION_PATH, BASE_INSTANT, FILE_ID);
+
+    List<String> allFiles = fileSlice.getAllFileNames();
+    assertEquals(0, allFiles.size());
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 4d6d89ebf1d6..adae1d8df614 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -21,6 +21,7 @@ package org.apache.hudi.source;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -148,7 +149,7 @@ public class FileIndex implements Serializable {
   }
 
   /**
-   * Returns all the file statuses under the table base path.
+   * Return all files in the filtered partitions.
    */
   public List<StoragePathInfo> getFilesInPartitions() {
     if (!tableExists) {
@@ -161,41 +162,42 @@ public class FileIndex implements Serializable {
     }
     Map<String, List<StoragePathInfo>> filesInPartitions = 
FSUtils.getFilesInPartitions(
         new HoodieFlinkEngineContext(hadoopConf), metaClient, metadataConfig, 
partitions);
-    int totalFilesNum = 
filesInPartitions.values().stream().mapToInt(List::size).sum();
-    if (totalFilesNum < 1) {
-      // returns early for empty table.
-      return Collections.emptyList();
-    }
+    return 
filesInPartitions.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+  }
 
-    List<StoragePathInfo> allFiles;
+  /**
+   * Filter file slices by pruning based on bucket id and column stats.
+   */
+  public List<FileSlice> filterFileSlices(List<FileSlice> fileSlices) {
+    List<FileSlice> filteredFileSlices;
     // bucket pruning
     if (this.partitionBucketIdFunc != null) {
-      allFiles = filesInPartitions.entrySet().stream().flatMap(entry -> {
-        String bucketIdStr = 
BucketIdentifier.bucketIdStr(partitionBucketIdFunc.apply(entry.getKey()));
-        return entry.getValue().stream().filter(fileInfo -> 
fileInfo.getPath().getName().contains(bucketIdStr));
+      filteredFileSlices = fileSlices.stream().filter(fileSlice -> {
+        String bucketIdStr = 
BucketIdentifier.bucketIdStr(partitionBucketIdFunc.apply(fileSlice.getPartitionPath()));
+        return fileSlice.getFileGroupId().getFileId().contains(bucketIdStr);
       }).collect(Collectors.toList());
-      logPruningMsg(totalFilesNum, allFiles.size(), "bucket pruning");
+      logPruningMsg(fileSlices.size(), filteredFileSlices.size(), "bucket 
pruning");
     } else {
-      allFiles = 
filesInPartitions.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
+      filteredFileSlices = fileSlices;
     }
-
-    if (allFiles.isEmpty()) {
-      // returns early for empty table.
-      return allFiles;
+    if (filteredFileSlices.isEmpty()) {
+      return Collections.emptyList();
     }
 
-    // data skipping
-    Set<String> candidateFiles = fileStatsIndex.computeCandidateFiles(
-        colStatsProbe, allFiles.stream().map(f -> 
f.getPath().getName()).collect(Collectors.toList()));
+    // data skipping based on column stats
+    List<String> allFiles = 
fileSlices.stream().map(FileSlice::getAllFileNames).flatMap(List::stream).collect(Collectors.toList());
+    Set<String> candidateFiles = 
fileStatsIndex.computeCandidateFiles(colStatsProbe, allFiles);
     if (candidateFiles == null) {
       // no need to filter by col stats or error occurs.
-      return allFiles;
+      return filteredFileSlices;
     }
-    List<StoragePathInfo> results = allFiles.stream().parallel()
-        .filter(fileStatus -> 
candidateFiles.contains(fileStatus.getPath().getName()))
-        .collect(Collectors.toList());
-    logPruningMsg(allFiles.size(), results.size(), "data skipping");
-    return results;
+    List<FileSlice> result = filteredFileSlices.stream().filter(fileSlice -> {
+      // if any file in the file slice is part of candidate file names, we 
need to include the file slice.
+      // in other words, if all files in the file slice are not present in 
candidate file names, we can filter out the file slice.
+      return 
fileSlice.getAllFileNames().stream().anyMatch(candidateFiles::contains);
+    }).collect(Collectors.toList());
+    logPruningMsg(filteredFileSlices.size(), result.size(), "column stats 
pruning");
+    return result;
   }
 
   /**
@@ -261,15 +263,15 @@ public class FileIndex implements Serializable {
     return false;
   }
 
-  private void logPruningMsg(int numTotalFiles, int numLeftFiles, String 
action) {
+  private void logPruningMsg(int numTotalFileSlices, int numLeftFileSlices, 
String action) {
     LOG.info("\n"
         + "------------------------------------------------------------\n"
         + "---------- action:        {}\n"
-        + "---------- total files:   {}\n"
-        + "---------- left files:    {}\n"
+        + "---------- total file slices:   {}\n"
+        + "---------- left file slices:    {}\n"
         + "---------- skipping rate: {}\n"
         + "------------------------------------------------------------",
-        action, numTotalFiles, numLeftFiles, percentage(numTotalFiles, 
numLeftFiles));
+        action, numTotalFileSlices, numLeftFileSlices, 
percentage(numTotalFileSlices, numLeftFileSlices));
   }
 
   private static double percentage(double total, double left) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index 4655ce760608..c7dd6dcdf9b2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -54,7 +54,6 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -175,15 +174,18 @@ public class IncrementalInputSplits implements 
Serializable {
     //   4. the end commit is archived
     Set<String> readPartitions;
     final List<StoragePathInfo> fileInfoList;
+    final List<FileSlice> fileSlices;
     if (fullTableScan) {
       // scans the partitions and files directly.
       FileIndex fileIndex = getFileIndex(metaClient);
       readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths());
-      if (readPartitions.size() == 0) {
+      if (readPartitions.isEmpty()) {
         LOG.warn("No partitions found for reading in user provided path.");
         return Result.EMPTY;
       }
       fileInfoList = fileIndex.getFilesInPartitions();
+      List<FileSlice> allFileSlices = getFileSlices(metaClient, 
commitTimeline, readPartitions, fileInfoList, 
analyzingResult.getMaxCompletionTime(), false);
+      fileSlices = fileIndex.filterFileSlices(allFileSlices);
     } else {
       if (cdcEnabled) {
         // case1: cdc change log enabled
@@ -197,7 +199,7 @@ public class IncrementalInputSplits implements Serializable 
{
           .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, commitTimeline))
           .collect(Collectors.toList());
       readPartitions = getReadPartitions(metadataList);
-      if (readPartitions.size() == 0) {
+      if (readPartitions.isEmpty()) {
         LOG.warn("No partitions found for reading in user provided path.");
         return Result.EMPTY;
       }
@@ -210,24 +212,24 @@ public class IncrementalInputSplits implements 
Serializable {
         // reading from the earliest, scans the partitions and files directly.
         FileIndex fileIndex = getFileIndex(metaClient);
         readPartitions = new TreeSet<>(fileIndex.getOrBuildPartitionPaths());
-        if (readPartitions.size() == 0) {
+        if (readPartitions.isEmpty()) {
           LOG.warn("No partitions found for reading in user provided path.");
           return Result.EMPTY;
         }
         fileInfoList = fileIndex.getFilesInPartitions();
+        List<FileSlice> allFileSlices = getFileSlices(metaClient, 
commitTimeline, readPartitions, fileInfoList, 
analyzingResult.getMaxCompletionTime(), false);
+        fileSlices = fileIndex.filterFileSlices(allFileSlices);
       } else {
-        fileInfoList = files;
+        fileSlices = getFileSlices(metaClient, commitTimeline, readPartitions, 
files, analyzingResult.getMaxCompletionTime(), false);
       }
     }
 
-    if (fileInfoList.size() == 0) {
+    if (fileSlices.isEmpty()) {
       LOG.warn("No files found for reading in user provided path.");
       return Result.EMPTY;
     }
 
-    List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, 
commitTimeline,
-        fileInfoList, readPartitions, endInstant, 
analyzingResult.getMaxCompletionTime(), instantRange, false);
-
+    List<MergeOnReadInputSplit> inputSplits = getInputSplits(fileSlices, 
metaClient, endInstant, instantRange);
     return Result.instance(inputSplits, endInstant);
   }
 
@@ -278,19 +280,20 @@ public class IncrementalInputSplits implements 
Serializable {
       FileIndex fileIndex = getFileIndex(metaClient);
 
       Set<String> readPartitions = new 
TreeSet<>(fileIndex.getOrBuildPartitionPaths());
-      if (readPartitions.size() == 0) {
+      if (readPartitions.isEmpty()) {
         LOG.warn("No partitions found for reading under path: {}", path);
         return Result.EMPTY;
       }
 
       List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
-      if (pathInfoList.size() == 0) {
+      if (pathInfoList.isEmpty()) {
         LOG.warn("No files found for reading under path: {}", path);
         return Result.EMPTY;
       }
+      List<FileSlice> allFileSlices = getFileSlices(metaClient, 
commitTimeline, readPartitions, pathInfoList, offsetToIssue, false);
+      List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
 
-      List<MergeOnReadInputSplit> inputSplits = getInputSplits(metaClient, 
commitTimeline,
-          pathInfoList, readPartitions, endInstant, offsetToIssue, null, 
false);
+      List<MergeOnReadInputSplit> inputSplits = getInputSplits(fileSlices, 
metaClient, endInstant, null);
 
       return Result.instance(inputSplits, endInstant, offsetToIssue);
     } else {
@@ -334,53 +337,56 @@ public class IncrementalInputSplits implements 
Serializable {
     List<HoodieCommitMetadata> metadataList = mergeList(archivedMetadataList, 
activeMetadataList);
 
     Set<String> readPartitions = getReadPartitions(metadataList);
-    if (readPartitions.size() == 0) {
+    if (readPartitions.isEmpty()) {
       LOG.warn("No partitions found for reading under path: {}", path);
       return Collections.emptyList();
     }
     List<StoragePathInfo> pathInfoList = WriteProfiles.getFilesFromMetadata(
         path, hadoopConf, metadataList, metaClient.getTableType());
+    List<FileSlice> fileSlices = getFileSlices(metaClient, commitTimeline, 
readPartitions, pathInfoList, queryContext.getMaxCompletionTime(), 
skipCompaction);
 
-    if (pathInfoList.size() == 0) {
+    if (fileSlices.isEmpty()) {
       LOG.warn("No files found for reading under path: {}", path);
       return Collections.emptyList();
     }
 
-    return getInputSplits(metaClient, commitTimeline,
-        pathInfoList, readPartitions, endInstant, 
queryContext.getMaxCompletionTime(), instantRange, skipCompaction);
+    return getInputSplits(fileSlices, metaClient, endInstant, instantRange);
   }
 
-  private List<MergeOnReadInputSplit> getInputSplits(
+  private List<FileSlice> getFileSlices(
       HoodieTableMetaClient metaClient,
       HoodieTimeline commitTimeline,
-      List<StoragePathInfo> pathInfoList,
       Set<String> readPartitions,
-      String endInstant,
+      List<StoragePathInfo> pathInfoList,
       String maxCompletionTime,
-      InstantRange instantRange,
       boolean skipBaseFiles) {
-    final HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, commitTimeline, pathInfoList);
+    try (HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, commitTimeline, pathInfoList)) {
+      return readPartitions.stream().flatMap(par -> skipBaseFiles ? 
fsView.getAllLogsMergedFileSliceBeforeOrOn(par, maxCompletionTime)
+          : fsView.getLatestMergedFileSlicesBeforeOrOn(par, 
maxCompletionTime)).collect(Collectors.toList());
+    }
+  }
+
+  private List<MergeOnReadInputSplit> getInputSplits(
+      List<FileSlice> fileSlices,
+      HoodieTableMetaClient metaClient,
+      String endInstant,
+      InstantRange instantRange) {
     final AtomicInteger cnt = new AtomicInteger(0);
     final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE);
-    return readPartitions.stream()
-        .map(relPartitionPath -> getFileSlices(fsView, relPartitionPath, 
maxCompletionTime, skipBaseFiles)
-            .map(fileSlice -> {
-              Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
-                  .sorted(HoodieLogFile.getLogFileComparator())
-                  .map(logFile -> logFile.getPath().toString())
-                  .filter(logPath -> 
!logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-                  .collect(Collectors.toList()));
-              String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
-              // the latest commit is used as the limit of the log reader 
instant upper threshold,
-              // it must be at least the latest instant time of the file slice 
to avoid data loss.
-              String latestCommit = 
InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant);
-              return new MergeOnReadInputSplit(cnt.getAndAdd(1),
-                  basePath, logPaths, latestCommit,
-                  metaClient.getBasePath().toString(), 
maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
-            }).collect(Collectors.toList()))
-        .flatMap(Collection::stream)
-        .sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit))
-        .collect(Collectors.toList());
+    return fileSlices.stream().map(fileSlice -> {
+      Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+          .sorted(HoodieLogFile.getLogFileComparator())
+          .map(logFile -> logFile.getPath().toString())
+          .filter(logPath -> 
!logPath.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
+          .collect(Collectors.toList()));
+      String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+      // the latest commit is used as the limit of the log reader instant 
upper threshold,
+      // it must be at least the latest instant time of the file slice to 
avoid data loss.
+      String latestCommit = 
InstantComparison.minInstant(fileSlice.getLatestInstantTime(), endInstant);
+      return new MergeOnReadInputSplit(cnt.getAndAdd(1),
+          basePath, logPaths, latestCommit,
+          metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, 
mergeType, instantRange, fileSlice.getFileId());
+    
}).sorted(Comparator.comparing(MergeOnReadInputSplit::getLatestCommit)).collect(Collectors.toList());
   }
 
   private List<MergeOnReadInputSplit> getCdcInputSplits(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index c6c1b3d2b871..2fb852d90ed4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -23,8 +23,9 @@ import 
org.apache.hudi.adapter.InputFormatSourceFunctionAdapter;
 import org.apache.hudi.adapter.TableFunctionProviderAdapter;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.BaseFile;
-import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -111,7 +112,6 @@ import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -397,30 +397,32 @@ public class HoodieTableSource implements
       throw new HoodieException("No files found for reading in user provided 
path.");
     }
 
-    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
-        // file-slice after pending compaction-requested instant-time is also 
considered valid
-        
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
-        pathInfoList);
-    if (!fsView.getLastInstant().isPresent()) {
-      return Collections.emptyList();
+    String latestCommit;
+    List<FileSlice> allFileSlices;
+    // file-slice after pending compaction-requested instant-time is also 
considered valid
+    try (HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
+        metaClient, 
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
 pathInfoList)) {
+      if (!fsView.getLastInstant().isPresent()) {
+        return Collections.emptyList();
+      }
+      latestCommit = fsView.getLastInstant().get().requestedTime();
+      allFileSlices = relPartitionPaths.stream()
+          .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par, 
latestCommit)).collect(Collectors.toList());
     }
-    String latestCommit = fsView.getLastInstant().get().requestedTime();
+    List<FileSlice> fileSlices = fileIndex.filterFileSlices(allFileSlices);
+
     final String mergeType = this.conf.get(FlinkOptions.MERGE_TYPE);
     final AtomicInteger cnt = new AtomicInteger(0);
     // generates one input split for each file group
-    return relPartitionPaths.stream()
-        .map(relPartitionPath -> 
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
-            .map(fileSlice -> {
-              String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
-              Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
-                  .sorted(HoodieLogFile.getLogFileComparator())
-                  .map(logFile -> logFile.getPath().toString())
-                  .collect(Collectors.toList()));
-              return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, 
logPaths, latestCommit,
-                  metaClient.getBasePath().toString(), 
maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId());
-            }).collect(Collectors.toList()))
-        .flatMap(Collection::stream)
-        .collect(Collectors.toList());
+    return fileSlices.stream().map(fileSlice -> {
+      String basePath = 
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+      Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
+          .sorted(HoodieLogFile.getLogFileComparator())
+          .map(logFile -> logFile.getPath().toString())
+          .collect(Collectors.toList()));
+      return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, 
latestCommit,
+          metaClient.getBasePath().toString(), maxCompactionMemoryInBytes, 
mergeType, null, fileSlice.getFileId());
+    }).collect(Collectors.toList());
   }
 
   public InputFormat<RowData, ?> getInputFormat() {
@@ -574,15 +576,12 @@ public class HoodieTableSource implements
   }
 
   private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
-    final List<StoragePathInfo> pathInfoList = getReadFiles();
-    if (pathInfoList.isEmpty()) {
+    final List<FileSlice> fileSlices = getBaseFileOnlyFileSlices();
+    if (fileSlices.isEmpty()) {
       return InputFormats.EMPTY_INPUT_FORMAT;
     }
 
-    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
-        
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(), 
pathInfoList);
-    Path[] paths = fsView.getLatestBaseFiles()
-        .map(HoodieBaseFile::getPathInfo)
+    Path[] paths = fileSlices.stream().map(fileSlice -> 
fileSlice.getBaseFile().get().getPathInfo())
         .map(e -> new Path(e.getPath().toUri())).toArray(Path[]::new);
 
     if (paths.length == 0) {
@@ -679,12 +678,21 @@ public class HoodieTableSource implements
    * Get the reader paths with partition path expanded.
    */
   @VisibleForTesting
-  public List<StoragePathInfo> getReadFiles() {
+  public List<FileSlice> getBaseFileOnlyFileSlices() {
     List<String> relPartitionPaths = getReadPartitions();
     if (relPartitionPaths.isEmpty()) {
       return Collections.emptyList();
     }
-    return fileIndex.getFilesInPartitions();
+    List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
+    try (HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient,
+        
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants(),
 pathInfoList)) {
+
+      List<FileSlice> allFileSlices = relPartitionPaths.stream()
+          .flatMap(par -> fsView.getLatestBaseFiles(par)
+              .map(baseFile -> new FileSlice(new HoodieFileGroupId(par, 
baseFile.getFileId()), baseFile.getCommitTime(), baseFile, 
Collections.emptyList())))
+          .collect(Collectors.toList());
+      return fileIndex.filterFileSlices(allFileSlices);
+    }
   }
 
   @VisibleForTesting
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index cea36070557f..4d2aabd90fc0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -49,6 +49,7 @@ import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -603,10 +604,20 @@ public class HoodieCatalog extends AbstractCatalog {
 
   private void refreshTableProperties(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable) {
     Map<String, String> options = newCatalogTable.getOptions();
+    ResolvedCatalogTable resolvedTable =  (ResolvedCatalogTable) 
newCatalogTable;
     final String avroSchema = AvroSchemaConverter.convertToSchema(
-        ((ResolvedCatalogTable) 
newCatalogTable).getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
+        
resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
         
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
     options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema);
+    java.util.Optional<UniqueConstraint> pkConstraintOpt = 
resolvedTable.getResolvedSchema().getPrimaryKey();
+    if (pkConstraintOpt.isPresent()) {
+      options.put(TableOptionProperties.PK_COLUMNS, String.join(",", 
pkConstraintOpt.get().getColumns()));
+      options.put(TableOptionProperties.PK_CONSTRAINT_NAME, 
pkConstraintOpt.get().getName());
+    }
+    if (resolvedTable.isPartitioned()) {
+      final String partitions = String.join(",", 
resolvedTable.getPartitionKeys());
+      options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
+    }
     String tablePathStr = inferTablePath(catalogPathStr, tablePath);
     try {
       TableOptionProperties.overwriteProperties(tablePathStr, hadoopConf, 
options);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index a50f4a2a5243..fab10d7fd74c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -19,8 +19,11 @@
 package org.apache.hudi.source;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.source.prune.ColumnStatsProbe;
@@ -82,8 +85,9 @@ public class TestFileIndex {
     conf.set(METADATA_ENABLED, true);
     conf.set(HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
     FileIndex fileIndex = FileIndex.builder().path(new 
StoragePath(tempFile.getAbsolutePath())).conf(conf)
-        
.rowType(TestConfigurations.ROW_TYPE).metaClient(StreamerUtil.createMetaClient(conf)).build();
+        .rowType(TestConfigurations.ROW_TYPE).metaClient(metaClient).build();
     List<String> partitionKeys = Collections.singletonList("partition");
     List<Map<String, String>> partitions =
         fileIndex.getPartitions(partitionKeys, 
PARTITION_DEFAULT_NAME.defaultValue(),
@@ -93,10 +97,9 @@ public class TestFileIndex {
         
.map(Map::values).flatMap(Collection::stream).sorted().collect(Collectors.joining(","));
     assertThat("should have 4 partitions", partitionPaths, 
is("par1,par2,par3,par4"));
 
-    List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
-    assertThat(pathInfoList.size(), is(4));
-    assertTrue(pathInfoList.stream().allMatch(fileInfo ->
-        
fileInfo.getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())));
+    List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+    assertThat(fileSlices.size(), is(4));
+    assertTrue(fileSlices.stream().allMatch(fileSlice -> 
fileSlice.getBaseFile().isPresent() && fileSlice.getLogFiles().count() == 0));
   }
 
   @Test
@@ -106,17 +109,18 @@ public class TestFileIndex {
     conf.set(KEYGEN_CLASS_NAME, 
NonpartitionedAvroKeyGenerator.class.getName());
     conf.set(METADATA_ENABLED, true);
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
     FileIndex fileIndex = FileIndex.builder().path(new 
StoragePath(tempFile.getAbsolutePath())).conf(conf)
-        
.rowType(TestConfigurations.ROW_TYPE).metaClient(StreamerUtil.createMetaClient(conf)).build();
+        .rowType(TestConfigurations.ROW_TYPE).metaClient(metaClient).build();
     List<String> partitionKeys = Collections.singletonList("");
     List<Map<String, String>> partitions =
         fileIndex.getPartitions(partitionKeys, 
PARTITION_DEFAULT_NAME.defaultValue(), false);
     assertThat(partitions.size(), is(0));
 
-    List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
-    assertThat(pathInfoList.size(), is(1));
-    assertTrue(pathInfoList.get(0).getPath().toString()
-        .endsWith(HoodieFileFormat.PARQUET.getFileExtension()));
+    List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+
+    assertThat(fileSlices.size(), is(1));
+    assertTrue(fileSlices.get(0).getBaseFile().isPresent() && 
fileSlices.get(0).getLogFiles().count() == 0);
   }
 
   @ParameterizedTest
@@ -130,9 +134,6 @@ public class TestFileIndex {
     List<Map<String, String>> partitions =
         fileIndex.getPartitions(partitionKeys, 
PARTITION_DEFAULT_NAME.defaultValue(), false);
     assertThat(partitions.size(), is(0));
-
-    List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
-    assertThat(pathInfoList.size(), is(0));
   }
 
   @Test
@@ -145,12 +146,13 @@ public class TestFileIndex {
 
     writeBigintDataset(conf);
 
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
     FileIndex fileIndex =
         FileIndex.builder()
             .path(new StoragePath(tempFile.getAbsolutePath()))
             .conf(conf)
             .rowType(TestConfigurations.ROW_TYPE_BIGINT)
-            .metaClient(StreamerUtil.createMetaClient(conf))
+            .metaClient(metaClient)
             
.columnStatsProbe(ColumnStatsProbe.newInstance(Collections.singletonList(CallExpression.permanent(
                 FunctionIdentifier.of("greaterThan"),
                 BuiltInFunctionDefinitions.GREATER_THAN,
@@ -162,8 +164,8 @@ public class TestFileIndex {
             .partitionPruner(null)
             .build();
 
-    List<StoragePathInfo> files = fileIndex.getFilesInPartitions();
-    assertThat(files.size(), is(2));
+    List<FileSlice> fileSlices = getFilteredFileSlices(metaClient, fileIndex);
+    assertThat(fileSlices.size(), is(2));
   }
 
   @ParameterizedTest
@@ -215,6 +217,22 @@ public class TestFileIndex {
     assertEquals(Arrays.asList("par3"), p);
   }
 
+  private List<FileSlice> getFilteredFileSlices(HoodieTableMetaClient 
metaClient, FileIndex fileIndex) {
+    List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
+    List<StoragePathInfo> pathInfoList = fileIndex.getFilesInPartitions();
+    HoodieTimeline timeline = metaClient.getCommitsAndCompactionTimeline();
+    List<FileSlice> allFileSlices;
+    try (HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, timeline, pathInfoList)) {
+      if (timeline.lastInstant().isPresent()) {
+        allFileSlices = relPartitionPaths.stream()
+            .flatMap(par -> fsView.getLatestMergedFileSlicesBeforeOrOn(par, 
timeline.lastInstant().get().requestedTime())).collect(Collectors.toList());
+      } else {
+        return Collections.emptyList();
+      }
+    }
+    return fileIndex.filterFileSlices(allFileSlices);
+  }
+
   private void writeBigintDataset(Configuration conf) throws Exception {
     List<RowData> dataset = Arrays.asList(
         insertRow(TestConfigurations.ROW_TYPE_BIGINT, 1L, 
StringData.fromString("Danny"), 23,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index c89148173b10..b59b51c035c0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -1927,6 +1927,54 @@ public class ITTestHoodieDataSource {
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testDataSkippingByFilteringFileSlice(HoodieTableType tableType) {
+    // Case: column for different files inside one file slice can be different,
+    // so if any file in the file slice satisfy the predicate based on column 
stats,
+    // then the file slice should be read.
+    // E.g., query predicate is age <> '25', base file contains: {key=k1, 
orderingVal=1, age=23},
+    // log file contains: {key=k1, orderingVal=2, age=25}, then the file slice 
should be read.
+    TableEnvironment tableEnv = batchTableEnv;
+    String path = tempFile.getAbsolutePath();
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, path)
+        .option(FlinkOptions.METADATA_ENABLED, true)
+        .option("hoodie.metadata.index.column.stats.enable", true)
+        .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.COMPACTION_DELTA_COMMITS, 1)
+        .option(FlinkOptions.COMPACTION_TASKS, 1)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    execInsertSql(tableEnv, TestSQL.INSERT_T1);
+
+    List<Row> result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1 where age <> 25 and 
`partition` = 'par1'").execute().collect());
+    assertRowsEquals(result1, "["
+        + "+I[id1, Danny, 23, 1970-01-01T00:00:01, par1], "
+        + "+I[id2, Stephen, 33, 1970-01-01T00:00:02, par1]]");
+
+    batchTableEnv.executeSql("drop table t1");
+
+    hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, path)
+        .option(FlinkOptions.METADATA_ENABLED, true)
+        .option("hoodie.metadata.index.column.stats.enable", true)
+        .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .end();
+    tableEnv.executeSql(hoodieTableDDL);
+
+    final String INSERT_T2 = "insert into t1 values\n"
+        + "('id1','Danny',25,TIMESTAMP '1970-01-01 00:01:01','par1')\n";
+    execInsertSql(tableEnv, INSERT_T2);
+    result1 = CollectionUtil.iterableToList(
+        () -> tableEnv.sqlQuery("select * from t1 where age <> 25 and 
`partition` = 'par1'").execute().collect());
+    assertRowsEquals(result1, "[+I[id2, Stephen, 33, 1970-01-01T00:00:02, 
par1]]");
+  }
+
   @Test
   void testParquetLogBlockDataSkipping() {
     TableEnvironment tableEnv = batchTableEnv;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 6d35ac1f1adf..c2c36bec4fd1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -19,13 +19,13 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.util.SerializableSchema;
 import org.apache.hudi.util.StreamerUtil;
@@ -100,9 +100,9 @@ public class TestHoodieTableSource {
   void testGetReadPaths() throws Exception {
     beforeEach();
     HoodieTableSource tableSource = getEmptyStreamingSource();
-    List<StoragePathInfo> fileList = tableSource.getReadFiles();
-    assertNotNull(fileList);
-    assertThat(fileList.size(), is(4));
+    List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices();
+    assertNotNull(fileSlices);
+    assertThat(fileSlices.size(), is(4));
     // apply partition pruning
     FieldReferenceExpression partRef = new 
FieldReferenceExpression("partition", DataTypes.STRING(), 4, 4);
     ValueLiteralExpression partLiteral = new ValueLiteralExpression("par1", 
DataTypes.STRING().notNull());
@@ -113,9 +113,9 @@ public class TestHoodieTableSource {
     HoodieTableSource tableSource2 = getEmptyStreamingSource();
     tableSource2.applyFilters(Arrays.asList(partFilter));
 
-    List<StoragePathInfo> fileList2 = tableSource2.getReadFiles();
-    assertNotNull(fileList2);
-    assertThat(fileList2.size(), is(1));
+    List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices();
+    assertNotNull(fileSlices2);
+    assertThat(fileSlices2.size(), is(1));
   }
 
   @Test
@@ -203,8 +203,8 @@ public class TestHoodieTableSource {
     int numBuckets = (int)FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.defaultValue();
 
     assertThat(tableSource1.getDataBucketFunc().get().apply(numBuckets), 
is(1));
-    List<StoragePathInfo> fileList = tableSource1.getReadFiles();
-    assertThat("Files should be pruned by bucket id 1", fileList.size(), 
CoreMatchers.is(2));
+    List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned by bucket id 1", fileSlices.size(), 
CoreMatchers.is(2));
 
     // test multiple primary keys filtering
     Configuration conf2 = conf1.clone();
@@ -218,8 +218,8 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1"),
         createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), 
"Danny")));
     assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets), 
is(3));
-    List<StoragePathInfo> fileList2 = tableSource2.getReadFiles();
-    assertThat("Files should be pruned by bucket id 3", fileList2.size(), 
CoreMatchers.is(3));
+    List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned by bucket id 3", fileSlices2.size(), 
CoreMatchers.is(3));
 
     // apply the filters in different order and test again.
     tableSource2.reset();
@@ -227,7 +227,7 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), 
"Danny"),
         createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1")));
     assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets), 
is(3));
-    assertThat("Files should be pruned by bucket id 3", 
tableSource2.getReadFiles().size(),
+    assertThat("Files should be pruned by bucket id 3", 
tableSource2.getBaseFileOnlyFileSlices().size(),
         CoreMatchers.is(3));
 
     // test partial primary keys filtering
@@ -242,8 +242,8 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1")));
 
     assertTrue(tableSource3.getDataBucketFunc().isEmpty());
-    List<StoragePathInfo> fileList3 = tableSource3.getReadFiles();
-    assertThat("Partial pk filtering does not prune any files", 
fileList3.size(),
+    List<FileSlice> fileSlices3 = tableSource3.getBaseFileOnlyFileSlices();
+    assertThat("Partial pk filtering does not prune any files", 
fileSlices3.size(),
         CoreMatchers.is(7));
 
     // test single primary keys filtering together with non-primary key 
predicate
@@ -257,8 +257,8 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), 
"Danny")));
 
     assertThat(tableSource4.getDataBucketFunc().get().apply(numBuckets), 
is(1));
-    List<StoragePathInfo> fileList4 = tableSource4.getReadFiles();
-    assertThat("Files should be pruned by bucket id 1", fileList4.size(), 
CoreMatchers.is(2));
+    List<FileSlice> fileSlices4 = tableSource4.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned by bucket id 1", fileSlices4.size(), 
CoreMatchers.is(2));
   }
 
   @ParameterizedTest
@@ -282,8 +282,8 @@ public class TestHoodieTableSource {
             LocalDateTime.ofInstant(Instant.ofEpochMilli(1), 
ZoneId.of("UTC")))));
 
     assertThat(tableSource1.getDataBucketFunc().get().apply(numBuckets), 
is(logicalTimestamp ? 1 : 0));
-    List<StoragePathInfo> fileList = tableSource1.getReadFiles();
-    assertThat("Files should be pruned", fileList.size(), CoreMatchers.is(1));
+    List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned", fileSlices.size(), 
CoreMatchers.is(1));
 
     // test date filtering
     Configuration conf2 = conf1.clone();
@@ -298,8 +298,8 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr(f2, 1, DataTypes.DATE().notNull(), 
LocalDate.ofEpochDay(1))));
 
     assertThat(tableSource2.getDataBucketFunc().get().apply(numBuckets), 
is(1));
-    List<StoragePathInfo> fileList2 = tableSource2.getReadFiles();
-    assertThat("Files should be pruned", fileList2.size(), CoreMatchers.is(1));
+    List<FileSlice> fileSlices2 = tableSource2.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned", fileSlices2.size(), 
CoreMatchers.is(1));
 
     // test decimal filtering
     Configuration conf3 = conf1.clone();
@@ -315,8 +315,8 @@ public class TestHoodieTableSource {
             new BigDecimal("1.11"))));
 
     assertThat(tableSource3.getDataBucketFunc().get().apply(numBuckets), 
is(0));
-    List<StoragePathInfo> fileList3 = tableSource3.getReadFiles();
-    assertThat("Files should be pruned", fileList3.size(), CoreMatchers.is(1));
+    List<FileSlice> fileSlices3 = tableSource3.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned", fileSlices3.size(), 
CoreMatchers.is(1));
   }
 
   @Test
@@ -373,8 +373,8 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1")));
 
     assertThat(tableSource1.getDataBucketFunc().get().apply(4), is(1));
-    List<StoragePathInfo> fileList = tableSource1.getReadFiles();
-    assertThat("Files should be pruned by bucket id 1", fileList.size(), 
CoreMatchers.is(2));
+    List<FileSlice> fileSlices = tableSource1.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned by bucket id 1", fileSlices.size(), 
CoreMatchers.is(2));
   }
 
   /**
@@ -403,8 +403,8 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1"),
         createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), 
"Danny")));
     assertThat(tableSource.getDataBucketFunc().get().apply(4), is(3));
-    List<StoragePathInfo> fileList = tableSource.getReadFiles();
-    assertThat("Files should be pruned by bucket id 3", fileList.size(), 
CoreMatchers.is(3));
+    List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned by bucket id 3", fileSlices.size(), 
CoreMatchers.is(3));
   }
 
   /**
@@ -433,8 +433,8 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr("uuid", 0, DataTypes.STRING().notNull(), 
"id1")));
 
     assertTrue(tableSource.getDataBucketFunc().isEmpty());
-    List<StoragePathInfo> fileList = tableSource.getReadFiles();
-    assertThat("Partial pk filtering does not prune any files", 
fileList.size(),
+    List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices();
+    assertThat("Partial pk filtering does not prune any files", 
fileSlices.size(),
         CoreMatchers.is(7));
   }
 
@@ -463,8 +463,8 @@ public class TestHoodieTableSource {
         createLitEquivalenceExpr("name", 1, DataTypes.STRING().notNull(), 
"Danny")));
 
     assertThat(tableSource.getDataBucketFunc().get().apply(4), is(1));
-    List<StoragePathInfo> fileList = tableSource.getReadFiles();
-    assertThat("Files should be pruned by bucket id 1", fileList.size(), 
CoreMatchers.is(2));
+    List<FileSlice> fileSlices = tableSource.getBaseFileOnlyFileSlices();
+    assertThat("Files should be pruned by bucket id 1", fileSlices.size(), 
CoreMatchers.is(2));
   }
 
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 61f7710b8775..ffc6d97170f2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -60,6 +60,7 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.DefaultCatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -456,6 +457,30 @@ public class TestHoodieCatalog {
         () -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE, 
"non_exist"), false));
   }
 
+  @Test
+  public void testAlterTable() throws Exception {
+    ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+    // create table
+    catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
+
+    DefaultCatalogTable oldTable = (DefaultCatalogTable) 
catalog.getTable(tablePath);
+    // same as old table
+    ResolvedCatalogTable newTable = new ResolvedCatalogTable(
+        CatalogUtils.createCatalogTable(
+            
Schema.newBuilder().fromResolvedSchema(CREATE_TABLE_SCHEMA).build(),
+            oldTable.getPartitionKeys(),
+            oldTable.getOptions(),
+            "test"),
+        CREATE_TABLE_SCHEMA
+    );
+    catalog.alterTable(tablePath, newTable, true);
+
+    // validate primary key property is not missing
+    DefaultCatalogTable table = (DefaultCatalogTable) 
catalog.getTable(tablePath);
+    assertTrue(table.getUnresolvedSchema().getPrimaryKey().isPresent());
+    assertTrue(table.isPartitioned());
+  }
+
   @Test
   public void testDropPartition() throws Exception {
     ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");

Reply via email to