This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 50ab8c79dfe [HUDI-8196] Support pruning based on partition stats index 
in Hudi Flink (#12132)
50ab8c79dfe is described below

commit 50ab8c79dfefc4c13fe297c14d9d7dced58a4593
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat Oct 26 10:17:34 2024 +0800

    [HUDI-8196] Support pruning based on partition stats index in Hudi Flink 
(#12132)
    
    * add a new interface ColumnStatsIndex;
    * refactor PartitionPruner to apply builder pattern.
    
    ---------
    
    Co-authored-by: danny0405 <[email protected]>
---
 .../hudi/sink/bootstrap/BootstrapOperator.java     |   2 +-
 .../java/org/apache/hudi/source/FileIndex.java     | 112 +++--------
 .../{DataPruner.java => ColumnStatsProbe.java}     |  10 +-
 .../apache/hudi/source/prune/PartitionPruners.java | 169 ++++++++++++++--
 .../apache/hudi/source/stats/ColumnStatsIndex.java |  57 ++++++
 .../hudi/source/stats/ColumnStatsSchemas.java      |  83 ++++++++
 ...ColumnStatsIndices.java => FileStatsIndex.java} | 218 +++++++++++++--------
 .../hudi/source/stats/PartitionStatsIndex.java     |  77 ++++++++
 .../org/apache/hudi/table/HoodieTableSource.java   |  54 +++--
 .../java/org/apache/hudi/util/StreamerUtil.java    |  13 ++
 .../hudi/source/TestExpressionEvaluators.java      |   2 +-
 .../java/org/apache/hudi/source/TestFileIndex.java |  60 +++++-
 .../hudi/source/TestIncrementalInputSplits.java    |  90 +++++++--
 ...stDataPruner.java => TestColumnStatsProbe.java} |   4 +-
 ...StatsIndices.java => TestColumnStatsIndex.java} |  53 ++++-
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  52 +++++
 .../apache/hudi/table/TestHoodieTableSource.java   |  55 +++++-
 .../apache/hudi/table/format/TestInputFormat.java  |   8 +-
 18 files changed, 884 insertions(+), 235 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 1e59ee8199e..86c6ee39f2c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -71,8 +71,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.hudi.source.FileIndex.metadataConfig;
 import static org.apache.hudi.util.StreamerUtil.isValidFile;
+import static org.apache.hudi.util.StreamerUtil.metadataConfig;
 
 /**
  * The operator to load index from existing hoodieTable.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index e4975c644a3..a8ff13fe621 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -21,30 +21,25 @@ package org.apache.hudi.source;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.index.bucket.BucketIdentifier;
-import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
-import org.apache.hudi.source.stats.ColumnStatsIndices;
+import org.apache.hudi.source.stats.FileStatsIndex;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,7 +49,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -69,24 +63,30 @@ public class FileIndex implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class);
 
   private final StoragePath path;
-  private final RowType rowType;
   private final boolean tableExists;
   private final HoodieMetadataConfig metadataConfig;
   private final org.apache.hadoop.conf.Configuration hadoopConf;
   private final PartitionPruners.PartitionPruner partitionPruner; // for 
partition pruning
-  private final DataPruner dataPruner;                            // for data 
skipping
+  private final ColumnStatsProbe colStatsProbe;                   // for 
probing column stats
   private final int dataBucket;                                   // for 
bucket pruning
   private List<String> partitionPaths;                            // cache of 
partition paths
-
-  private FileIndex(StoragePath path, Configuration conf, RowType rowType, 
DataPruner dataPruner, PartitionPruners.PartitionPruner partitionPruner, int 
dataBucket) {
+  private final FileStatsIndex fileStatsIndex;                    // for data 
skipping
+
+  private FileIndex(
+      StoragePath path,
+      Configuration conf,
+      RowType rowType,
+      ColumnStatsProbe colStatsProbe,
+      PartitionPruners.PartitionPruner partitionPruner,
+      int dataBucket) {
     this.path = path;
-    this.rowType = rowType;
     this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
     this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf);
-    this.metadataConfig = metadataConfig(conf);
-    this.dataPruner = 
isDataSkippingFeasible(conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED))
 ? dataPruner : null;
+    this.metadataConfig = StreamerUtil.metadataConfig(conf);
+    this.colStatsProbe = 
isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? 
colStatsProbe : null;
     this.partitionPruner = partitionPruner;
     this.dataBucket = dataBucket;
+    this.fileStatsIndex = new FileStatsIndex(path.toString(), rowType, 
metadataConfig);
   }
 
   /**
@@ -154,6 +154,9 @@ public class FileIndex implements Serializable {
     }
     String[] partitions =
         getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, 
p)).toArray(String[]::new);
+    if (partitions.length < 1) {
+      return Collections.emptyList();
+    }
     List<StoragePathInfo> allFiles = FSUtils.getFilesInPartitions(
             new HoodieFlinkEngineContext(hadoopConf),
             new HoodieHadoopStorage(path, 
HadoopFSUtils.getStorageConf(hadoopConf)), metadataConfig, path.toString(), 
partitions)
@@ -177,7 +180,8 @@ public class FileIndex implements Serializable {
     }
 
     // data skipping
-    Set<String> candidateFiles = candidateFilesInMetadataTable(allFiles);
+    Set<String> candidateFiles = fileStatsIndex.computeCandidateFiles(
+        colStatsProbe, allFiles.stream().map(f -> 
f.getPath().getName()).collect(Collectors.toList()));
     if (candidateFiles == null) {
       // no need to filter by col stats or error occurs.
       return allFiles;
@@ -215,65 +219,6 @@ public class FileIndex implements Serializable {
   //  Utilities
   // -------------------------------------------------------------------------
 
-  /**
-   * Computes pruned list of candidate base-files' names based on provided 
list of data filters.
-   * conditions, by leveraging Metadata Table's Column Statistics index 
(hereon referred as ColStats for brevity)
-   * bearing "min", "max", "num_nulls" statistics for all columns.
-   *
-   * <p>NOTE: This method has to return complete set of candidate files, since 
only provided candidates will
-   * ultimately be scanned as part of query execution. Hence, this method has 
to maintain the
-   * invariant of conservatively including every base-file's name, that is NOT 
referenced in its index.
-   *
-   * <p>The {@code filters} must all be simple.
-   *
-   * @return set of pruned (data-skipped) candidate base-files' names
-   */
-  @Nullable
-  private Set<String> candidateFilesInMetadataTable(List<StoragePathInfo> 
allFileStatus) {
-    if (dataPruner == null) {
-      return null;
-    }
-    try {
-      String[] referencedCols = dataPruner.getReferencedCols();
-      final List<RowData> colStats =
-          ColumnStatsIndices.readColumnStatsIndex(path.toString(), 
metadataConfig, referencedCols);
-      final Pair<List<RowData>, String[]> colStatsTable =
-          ColumnStatsIndices.transposeColumnStatsIndex(colStats, 
referencedCols, rowType);
-      List<RowData> transposedColStats = colStatsTable.getLeft();
-      String[] queryCols = colStatsTable.getRight();
-      if (queryCols.length == 0) {
-        // the indexed columns have no intersection with the referenced 
columns, returns early
-        return null;
-      }
-      RowType.RowField[] queryFields = DataTypeUtils.projectRowFields(rowType, 
queryCols);
-
-      Set<String> allIndexedFileNames = transposedColStats.stream().parallel()
-          .map(row -> row.getString(0).toString())
-          .collect(Collectors.toSet());
-      Set<String> candidateFileNames = transposedColStats.stream().parallel()
-          .filter(row -> dataPruner.test(row, queryFields))
-          .map(row -> row.getString(0).toString())
-          .collect(Collectors.toSet());
-
-      // NOTE: Col-Stats Index isn't guaranteed to have complete set of 
statistics for every
-      //       base-file: since it's bound to clustering, which could occur 
asynchronously
-      //       at arbitrary point in time, and is not likely to be touching 
all the base files.
-      //
-      //       To close that gap, we manually compute the difference b/w all 
indexed (by col-stats-index)
-      //       files and all outstanding base-files, and make sure that all 
base files not
-      //       represented w/in the index are included in the output of this 
method
-      Set<String> nonIndexedFileNames = allFileStatus.stream()
-          .map(fileStatus -> 
fileStatus.getPath().getName()).collect(Collectors.toSet());
-      nonIndexedFileNames.removeAll(allIndexedFileNames);
-
-      candidateFileNames.addAll(nonIndexedFileNames);
-      return candidateFileNames;
-    } catch (Throwable throwable) {
-      LOG.warn("Read column stats for data skipping error", throwable);
-      return null;
-    }
-  }
-
   /**
    * Returns all the relative partition paths.
    *
@@ -296,15 +241,6 @@ public class FileIndex implements Serializable {
     return this.partitionPaths;
   }
 
-  public static HoodieMetadataConfig 
metadataConfig(org.apache.flink.configuration.Configuration conf) {
-    Properties properties = new Properties();
-
-    // set up metadata.enabled=true in table DDL to enable metadata listing
-    properties.put(HoodieMetadataConfig.ENABLE.key(), 
conf.getBoolean(FlinkOptions.METADATA_ENABLED));
-
-    return 
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
-  }
-
   private boolean isDataSkippingFeasible(boolean dataSkippingEnabled) {
     // NOTE: Data Skipping is only effective when it references columns that 
are indexed w/in
     //       the Column Stats Index (CSI). Following cases could not be 
effectively handled by Data Skipping:
@@ -348,7 +284,7 @@ public class FileIndex implements Serializable {
     private StoragePath path;
     private Configuration conf;
     private RowType rowType;
-    private DataPruner dataPruner;
+    private ColumnStatsProbe columnStatsProbe;
     private PartitionPruners.PartitionPruner partitionPruner;
     private int dataBucket = PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
 
@@ -370,8 +306,8 @@ public class FileIndex implements Serializable {
       return this;
     }
 
-    public Builder dataPruner(DataPruner dataPruner) {
-      this.dataPruner = dataPruner;
+    public Builder columnStatsProbe(ColumnStatsProbe columnStatsProbe) {
+      this.columnStatsProbe = columnStatsProbe;
       return this;
     }
 
@@ -387,7 +323,7 @@ public class FileIndex implements Serializable {
 
     public FileIndex build() {
       return new FileIndex(Objects.requireNonNull(path), 
Objects.requireNonNull(conf), Objects.requireNonNull(rowType),
-          dataPruner, partitionPruner, dataBucket);
+          columnStatsProbe, partitionPruner, dataBucket);
     }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/DataPruner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
similarity index 93%
rename from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/DataPruner.java
rename to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
index 25257c83f64..a4b48194002 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/DataPruner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
@@ -39,15 +39,15 @@ import java.util.Map;
 import static org.apache.hudi.source.ExpressionEvaluators.fromExpression;
 
 /**
- * Utility to do data skipping.
+ * Utility for filtering the column stats metadata payloads.
  */
-public class DataPruner implements Serializable {
+public class ColumnStatsProbe implements Serializable {
   private static final long serialVersionUID = 1L;
 
   private final String[] referencedCols;
   private final List<ExpressionEvaluators.Evaluator> evaluators;
 
-  private DataPruner(String[] referencedCols, 
List<ExpressionEvaluators.Evaluator> evaluators) {
+  private ColumnStatsProbe(String[] referencedCols, 
List<ExpressionEvaluators.Evaluator> evaluators) {
     this.referencedCols = referencedCols;
     this.evaluators = evaluators;
   }
@@ -74,7 +74,7 @@ public class DataPruner implements Serializable {
   }
 
   @Nullable
-  public static DataPruner newInstance(List<ResolvedExpression> filters) {
+  public static ColumnStatsProbe newInstance(List<ResolvedExpression> filters) 
{
     if (filters.isEmpty()) {
       return null;
     }
@@ -83,7 +83,7 @@ public class DataPruner implements Serializable {
       return null;
     }
     List<ExpressionEvaluators.Evaluator> evaluators = fromExpression(filters);
-    return new DataPruner(referencedCols, evaluators);
+    return new ColumnStatsProbe(referencedCols, evaluators);
   }
 
   public static Map<String, ColumnStats> convertColumnStats(RowData indexRow, 
RowType.RowField[] queryFields) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
index 3f6338896d6..955f471df31 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
@@ -18,23 +18,31 @@
 
 package org.apache.hudi.source.prune;
 
-import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.ExpressionEvaluators.Evaluator;
 import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.source.stats.PartitionStatsIndex;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
 
 import java.io.Serializable;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Tools to prune partitions.
@@ -125,21 +133,154 @@ public class PartitionPruners {
     }
   }
 
-  public static PartitionPruner getInstance(
-      List<ExpressionEvaluators.Evaluator> partitionEvaluators,
-      List<String> partitionKeys,
-      List<DataType> partitionTypes,
-      String defaultParName,
-      boolean hivePartition) {
-    ValidationUtils.checkState(!partitionEvaluators.isEmpty());
-    return new DynamicPartitionPruner(partitionEvaluators, partitionKeys, 
partitionTypes, defaultParName, hivePartition);
+  /**
+   * ColumnStats partition pruner for hoodie table source which enables 
partition stats index.
+   *
+   * <p>Note: The data of new partitions created after the job starts could be 
read if they match the
+   * filter conditions.
+   */
+  public static class ColumnStatsPartitionPruner implements PartitionPruner {
+    private static final long serialVersionUID = 1L;
+    private final ColumnStatsProbe probe;
+    private final PartitionStatsIndex partitionStatsIndex;
+
+    public ColumnStatsPartitionPruner(
+        RowType rowType,
+        String basePath,
+        HoodieMetadataConfig metadataConfig,
+        ColumnStatsProbe probe) {
+      this.probe = probe;
+      this.partitionStatsIndex = new PartitionStatsIndex(basePath, rowType, 
metadataConfig);
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      Set<String> candidatePartitions = 
partitionStatsIndex.computeCandidatePartitions(probe, new 
ArrayList<>(partitions));
+      if (candidatePartitions == null) {
+        return new HashSet<>(partitions);
+      }
+      return 
partitions.stream().filter(candidatePartitions::contains).collect(Collectors.toSet());
+    }
+  }
+
+  /**
+   * Chained partition pruner for hoodie table source combining multiple 
partition pruners with predicate '&'.
+   */
+  public static class ChainedPartitionPruner implements PartitionPruner {
+    private static final long serialVersionUID = 1L;
+    private final List<PartitionPruner> pruners;
+
+    public ChainedPartitionPruner(List<PartitionPruner> pruners) {
+      this.pruners = pruners;
+    }
+
+    @Override
+    public Set<String> filter(Collection<String> partitions) {
+      for (PartitionPruner pruner: pruners) {
+        partitions = pruner.filter(partitions);
+      }
+      return new HashSet<>(partitions);
+    }
   }
 
-  public static PartitionPruner getInstance(Collection<String> 
candidatePartitions) {
-    return new StaticPartitionPruner(candidatePartitions);
+  public static Builder builder() {
+    return new Builder();
   }
 
-  public static PartitionPruner getInstance(String... candidatePartitions) {
-    return new StaticPartitionPruner(Arrays.asList(candidatePartitions));
+  public static class Builder {
+    private RowType rowType;
+    private String basePath;
+    private Configuration conf;
+    private ColumnStatsProbe probe;
+    private List<ExpressionEvaluators.Evaluator> partitionEvaluators;
+    private List<String> partitionKeys;
+    private List<DataType> partitionTypes;
+    private String defaultParName;
+    private boolean hivePartition;
+    private Collection<String> candidatePartitions;
+
+    private Builder() {
+    }
+
+    public Builder rowType(RowType rowType) {
+      this.rowType = rowType;
+      return this;
+    }
+
+    public Builder basePath(String basePath) {
+      this.basePath = basePath;
+      return this;
+    }
+
+    public Builder conf(Configuration conf) {
+      this.conf = conf;
+      return this;
+    }
+
+    public Builder columnStatsProbe(ColumnStatsProbe probe) {
+      this.probe = probe;
+      return this;
+    }
+
+    public Builder partitionEvaluators(List<Evaluator> partitionEvaluators) {
+      this.partitionEvaluators = partitionEvaluators;
+      return this;
+    }
+
+    public Builder partitionKeys(List<String> partitionKeys) {
+      this.partitionKeys = partitionKeys;
+      return this;
+    }
+
+    public Builder partitionTypes(List<DataType> partitionTypes) {
+      this.partitionTypes = partitionTypes;
+      return this;
+    }
+
+    public Builder defaultParName(String defaultParName) {
+      this.defaultParName = defaultParName;
+      return this;
+    }
+
+    public Builder hivePartition(boolean hivePartition) {
+      this.hivePartition = hivePartition;
+      return this;
+    }
+
+    public Builder candidatePartitions(Collection<String> candidatePartitions) 
{
+      this.candidatePartitions = candidatePartitions;
+      return this;
+    }
+
+    public PartitionPruner build() {
+      PartitionPruner staticPruner = null;
+      if (candidatePartitions != null && !candidatePartitions.isEmpty()) {
+        staticPruner = new StaticPartitionPruner(candidatePartitions);
+      }
+      PartitionPruner dynamicPruner = null;
+      if (partitionEvaluators != null && !partitionEvaluators.isEmpty()) {
+        dynamicPruner = new DynamicPartitionPruner(partitionEvaluators, 
Objects.requireNonNull(partitionKeys),
+            Objects.requireNonNull(partitionTypes), 
Objects.requireNonNull(defaultParName),
+            hivePartition);
+      }
+      PartitionPruner columnStatsPruner = null;
+      if (probe != null
+          && conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)
+          && conf.get(FlinkOptions.METADATA_ENABLED)) {
+        columnStatsPruner = new 
ColumnStatsPartitionPruner(Objects.requireNonNull(rowType), 
Objects.requireNonNull(basePath),
+            StreamerUtil.metadataConfig(Objects.requireNonNull(conf)), probe);
+      }
+      List<PartitionPruner> partitionPruners =
+          Stream.of(staticPruner, dynamicPruner, columnStatsPruner)
+              .filter(Objects::nonNull)
+              .collect(Collectors.toList());
+      if (partitionPruners.isEmpty()) {
+        return null;
+      }
+      if (partitionPruners.size() < 2) {
+        return partitionPruners.get(0);
+      }
+      return new ChainedPartitionPruner(partitionPruners);
+    }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
new file mode 100644
index 00000000000..1d73e167e33
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.source.prune.ColumnStatsProbe;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Base support that leverages Metadata Table's indexes, such as Column Stats 
Index
+ * and Partition Stats Index, to prune files and partitions.
+ */
+public interface ColumnStatsIndex extends Serializable {
+
+  /**
+   * Returns the partition name of the index.
+   */
+  String getIndexPartitionName();
+
+  /**
+   * Computes the filtered files with given candidates.
+   *
+   * @param columnStatsProbe The utility to filter the column stats metadata.
+   * @param allFile          The file name list of the candidate files.
+   *
+   * @return The set of filtered file names
+   */
+  Set<String> computeCandidateFiles(ColumnStatsProbe columnStatsProbe, 
List<String> allFile);
+
+  /**
+   * Computes the filtered partition paths with given candidates.
+   *
+   * @param columnStatsProbe The utility to filter the column stats metadata.
+   * @param allPartitions    The <strong>relative</strong> partition path list 
of the candidate partitions.
+   *
+   * @return The set of filtered relative partition paths
+   */
+  Set<String> computeCandidatePartitions(ColumnStatsProbe columnStatsProbe, 
List<String> allPartitions);
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
new file mode 100644
index 00000000000..1188c241990
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.util.AvroSchemaConverter;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.stream.Stream;
+
+/**
+ * Utility class for column stats schema.
+ */
+public class ColumnStatsSchemas {
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+  public static final DataType METADATA_DATA_TYPE = getMetadataDataType();
+  public static final DataType COL_STATS_DATA_TYPE = getColStatsDataType();
+  public static final int[] COL_STATS_TARGET_POS = getColStatsTargetPos();
+
+  // the column schema:
+  // |- file_name: string
+  // |- min_val: row
+  // |- max_val: row
+  // |- null_cnt: long
+  // |- val_cnt: long
+  // |- column_name: string
+  public static final int ORD_FILE_NAME = 0;
+  public static final int ORD_MIN_VAL = 1;
+  public static final int ORD_MAX_VAL = 2;
+  public static final int ORD_NULL_CNT = 3;
+  public static final int ORD_VAL_CNT = 4;
+  public static final int ORD_COL_NAME = 5;
+
+  private static DataType getMetadataDataType() {
+    return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$);
+  }
+
+  private static DataType getColStatsDataType() {
+    int pos = 
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
+    return METADATA_DATA_TYPE.getChildren().get(pos);
+  }
+
+  // the column schema:
+  // |- file_name: string
+  // |- min_val: row
+  // |- max_val: row
+  // |- null_cnt: long
+  // |- val_cnt: long
+  // |- column_name: string
+  private static int[] getColStatsTargetPos() {
+    RowType colStatsRowType = (RowType) COL_STATS_DATA_TYPE.getLogicalType();
+    return Stream.of(
+            HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+            HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+            HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+            HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
+            HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
+            HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
+        .mapToInt(colStatsRowType::getFieldIndex)
+        .toArray();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
similarity index 67%
rename from 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
rename to 
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
index 674e58dce6b..0b3ffeff7d5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Tuple3;
 import org.apache.hudi.common.util.hash.ColumnIndexID;
@@ -31,9 +32,10 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.FlinkClientUtil;
 import org.apache.hudi.util.RowDataProjection;
 
@@ -41,9 +43,12 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -55,44 +60,130 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_DATA_TYPE;
+import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_TARGET_POS;
+import static 
org.apache.hudi.source.stats.ColumnStatsSchemas.METADATA_DATA_TYPE;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_COL_NAME;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_FILE_NAME;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_MAX_VAL;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_MIN_VAL;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_NULL_CNT;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_VAL_CNT;
 
 /**
- * Utilities for abstracting away heavy-lifting of interactions with Metadata 
Table's Column Stats Index,
+ * An index support implementation that leverages Column Stats Index to prune 
files,
+ * including utilities for abstracting away heavy-lifting of interactions with 
the index,
  * providing convenient interfaces to read it, transpose, etc.
  */
-public class ColumnStatsIndices {
-  private static final DataType METADATA_DATA_TYPE = getMetadataDataType();
-  private static final DataType COL_STATS_DATA_TYPE = getColStatsDataType();
-  private static final int[] COL_STATS_TARGET_POS = getColStatsTargetPos();
-
-  // the column schema:
-  // |- file_name: string
-  // |- min_val: row
-  // |- max_val: row
-  // |- null_cnt: long
-  // |- val_cnt: long
-  // |- column_name: string
-  private static final int ORD_FILE_NAME = 0;
-  private static final int ORD_MIN_VAL = 1;
-  private static final int ORD_MAX_VAL = 2;
-  private static final int ORD_NULL_CNT = 3;
-  private static final int ORD_VAL_CNT = 4;
-  private static final int ORD_COL_NAME = 5;
-
-  private ColumnStatsIndices() {
+public class FileStatsIndex implements ColumnStatsIndex {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileStatsIndex.class);
+  private final RowType rowType;
+  private final String basePath;
+  private final HoodieMetadataConfig metadataConfig;
+  private HoodieTableMetadata metadataTable;
+
+  public FileStatsIndex(
+      String basePath,
+      RowType rowType,
+      HoodieMetadataConfig metadataConfig) {
+    this.basePath = basePath;
+    this.rowType = rowType;
+    this.metadataConfig = metadataConfig;
   }
 
-  public static List<RowData> readColumnStatsIndex(String basePath, 
HoodieMetadataConfig metadataConfig, String[] targetColumns) {
-    // NOTE: If specific columns have been provided, we can considerably trim 
down amount of data fetched
-    //       by only fetching Column Stats Index records pertaining to the 
requested columns.
-    //       Otherwise, we fall back to read whole Column Stats Index
-    ValidationUtils.checkArgument(targetColumns.length > 0,
-        "Column stats is only valid when push down filters have referenced 
columns");
-    final List<RowData> metadataRows = readColumnStatsIndexByColumns(basePath, 
targetColumns, metadataConfig);
-    return projectNestedColStatsColumns(metadataRows);
+  @Override
+  public String getIndexPartitionName() {
+    return HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+  }
+
+  public HoodieTableMetadata getMetadataTable() {
+    // initialize the metadata table lazily
+    if (this.metadataTable == null) {
+      this.metadataTable = HoodieTableMetadata.create(
+          HoodieFlinkEngineContext.DEFAULT,
+          new HoodieHadoopStorage(basePath, FlinkClientUtil.getHadoopConf()),
+          metadataConfig,
+          basePath);
+    }
+    return this.metadataTable;
+  }
+
+  @Override
+  public Set<String> computeCandidateFiles(ColumnStatsProbe probe, 
List<String> allFiles) {
+    if (probe == null) {
+      return null;
+    }
+    try {
+      String[] targetColumns = probe.getReferencedCols();
+      final List<RowData> statsRows = 
readColumnStatsIndexByColumns(targetColumns);
+      return candidatesInMetadataTable(probe, statsRows, allFiles);
+    } catch (Throwable t) {
+      LOG.warn("Read {} for data skipping error", getIndexPartitionName(), t);
+      return null;
+    }
+  }
+
+  @Override
+  public Set<String> computeCandidatePartitions(ColumnStatsProbe probe, 
List<String> allPartitions) {
+    throw new UnsupportedOperationException("This method is not supported by " 
+ this.getClass().getSimpleName());
+  }
+
+  /**
+   * Computes pruned list of candidates' names based on provided list of data 
filters.
+   * conditions, by leveraging Metadata Table's Column Statistics index 
(hereon referred as ColStats for brevity)
+   * bearing "min", "max", "num_nulls" statistics for all columns.
+   *
+   * <p>NOTE: This method has to return complete set of the candidates, since 
only provided candidates will
+   * ultimately be scanned as part of query execution. Hence, this method has 
to maintain the
+   * invariant of conservatively including every candidate's name, that is NOT 
referenced in its index.
+   *
+   * <p>The {@code filters} must all be simple.
+   *
+   * @param probe         The column stats probe built from push-down filters.
+   * @param indexRows     The raw column stats records.
+   * @param oriCandidates The original candidates to be pruned.
+   *
+   * @return set of pruned (data-skipped) candidate names
+   */
+  protected Set<String> candidatesInMetadataTable(
+      @Nullable ColumnStatsProbe probe,
+      List<RowData> indexRows,
+      List<String> oriCandidates) {
+    if (probe == null) {
+      return null;
+    }
+    String[] referencedCols = probe.getReferencedCols();
+    final Pair<List<RowData>, String[]> colStatsTable =
+        transposeColumnStatsIndex(indexRows, referencedCols);
+    List<RowData> transposedColStats = colStatsTable.getLeft();
+    String[] queryCols = colStatsTable.getRight();
+    if (queryCols.length == 0) {
+      // the indexed columns have no intersection with the referenced columns, 
returns early
+      return null;
+    }
+    RowType.RowField[] queryFields = DataTypeUtils.projectRowFields(rowType, 
queryCols);
+
+    Set<String> allIndexedFiles = transposedColStats.stream().parallel()
+        .map(row -> row.getString(0).toString())
+        .collect(Collectors.toSet());
+    Set<String> candidateFiles = transposedColStats.stream().parallel()
+        .filter(row -> probe.test(row, queryFields))
+        .map(row -> row.getString(0).toString())
+        .collect(Collectors.toSet());
+
+    // NOTE: Col-Stats Index isn't guaranteed to have complete set of 
statistics for every
+    //       base-file: since it's bound to clustering, which could occur 
asynchronously
+    //       at arbitrary point in time, and is not likely to be touching all 
the base files.
+    //
+    //       To close that gap, we manually compute the difference b/w all 
indexed (by col-stats-index)
+    //       files and all outstanding base-files, and make sure that all base 
files not
+    //       represented w/in the index are included in the output of this 
method
+    oriCandidates.removeAll(allIndexedFiles);
+    candidateFiles.addAll(oriCandidates);
+    return candidateFiles;
   }
 
   private static List<RowData> projectNestedColStatsColumns(List<RowData> 
rows) {
@@ -140,12 +231,12 @@ public class ColumnStatsIndices {
    *
    * @param colStats     RowData list bearing raw Column Stats Index table
    * @param queryColumns target columns to be included into the final table
-   * @param tableSchema  schema of the source data table
    * @return reshaped table according to the format outlined above
    */
-  public static Pair<List<RowData>, String[]> 
transposeColumnStatsIndex(List<RowData> colStats, String[] queryColumns, 
RowType tableSchema) {
+  @VisibleForTesting
+  public Pair<List<RowData>, String[]> transposeColumnStatsIndex(List<RowData> 
colStats, String[] queryColumns) {
 
-    Map<String, LogicalType> tableFieldTypeMap = 
tableSchema.getFields().stream()
+    Map<String, LogicalType> tableFieldTypeMap = rowType.getFields().stream()
         .collect(Collectors.toMap(RowType.RowField::getName, 
RowType.RowField::getType));
 
     // NOTE: We have to collect list of indexed columns to make sure we 
properly align the rows
@@ -243,7 +334,6 @@ public class ColumnStatsIndices {
     // |- null_cnt: long
     // |- val_cnt: long
     // |- column_name: string
-
     GenericRowData unpackedRow = new GenericRowData(row.getArity());
     unpackedRow.setField(0, row.getString(0));
     unpackedRow.setField(1, minValue);
@@ -278,28 +368,27 @@ public class ColumnStatsIndices {
     return converter.convert(rawVal);
   }
 
-  private static List<RowData> readColumnStatsIndexByColumns(
-      String basePath,
-      String[] targetColumns,
-      HoodieMetadataConfig metadataConfig) {
+  @VisibleForTesting
+  public List<RowData> readColumnStatsIndexByColumns(String[] targetColumns) {
+    // NOTE: If specific columns have been provided, we can considerably trim 
down amount of data fetched
+    //       by only fetching Column Stats Index records pertaining to the 
requested columns.
+    //       Otherwise, we fall back to read whole Column Stats Index
+    ValidationUtils.checkArgument(targetColumns.length > 0,
+        "Column stats is only valid when push down filters have referenced 
columns");
 
-    // Read Metadata Table's Column Stats Index into Flink's RowData list by
-    //    - Fetching the records from CSI by key-prefixes (encoded column 
names)
+    // Read Metadata Table's column stats Flink's RowData list by
+    //    - Fetching the records by key-prefixes (encoded column names)
     //    - Deserializing fetched records into [[RowData]]s
-    HoodieTableMetadata metadataTable = HoodieTableMetadata.create(
-        HoodieFlinkEngineContext.DEFAULT, new HoodieHadoopStorage(basePath, 
FlinkClientUtil.getHadoopConf()),
-        metadataConfig, basePath);
-
     // TODO encoding should be done internally w/in HoodieBackedTableMetadata
     List<String> encodedTargetColumnNames = Arrays.stream(targetColumns)
         .map(colName -> new 
ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());
 
     HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
-        metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames, 
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, false);
+        getMetadataTable().getRecordsByKeyPrefixes(encodedTargetColumnNames, 
getIndexPartitionName(), false);
 
     org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter 
converter =
         AvroToRowDataConverters.createRowConverter((RowType) 
METADATA_DATA_TYPE.getLogicalType());
-    return records.collectAsList().stream().parallel().map(record -> {
+    List<RowData> rows = 
records.collectAsList().stream().parallel().map(record -> {
           // schema and props are ignored for generating metadata record from 
the payload
           // instead, the underlying file system, or bloom filter, or columns 
stats metadata (part of payload) are directly used
           GenericRecord genericRecord;
@@ -311,37 +400,6 @@ public class ColumnStatsIndices {
           return (RowData) converter.convert(genericRecord);
         }
     ).collect(Collectors.toList());
-  }
-
-  // -------------------------------------------------------------------------
-  //  Utilities
-  // -------------------------------------------------------------------------
-  private static DataType getMetadataDataType() {
-    return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$);
-  }
-
-  private static DataType getColStatsDataType() {
-    int pos = 
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
-    return METADATA_DATA_TYPE.getChildren().get(pos);
-  }
-
-  // the column schema:
-  // |- file_name: string
-  // |- min_val: row
-  // |- max_val: row
-  // |- null_cnt: long
-  // |- val_cnt: long
-  // |- column_name: string
-  private static int[] getColStatsTargetPos() {
-    RowType colStatsRowType = (RowType) COL_STATS_DATA_TYPE.getLogicalType();
-    return Stream.of(
-            HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
-            HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
-            HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
-            HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
-            HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
-            HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
-        .mapToInt(colStatsRowType::getFieldIndex)
-        .toArray();
+    return projectNestedColStatsColumns(rows);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
new file mode 100644
index 00000000000..e2f6431e77d
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
+
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An index support implementation that leverages Partition Stats Index to 
prune partitions.
+ */
+public class PartitionStatsIndex extends FileStatsIndex {
+  private static final long serialVersionUID = 1L;
+
+  public PartitionStatsIndex(
+      String basePath,
+      RowType tableRowType,
+      HoodieMetadataConfig metadataConfig) {
+    super(basePath, tableRowType, metadataConfig);
+  }
+
+  @Override
+  public String getIndexPartitionName() {
+    return HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
+  }
+
+  @Override
+  public Set<String> computeCandidateFiles(ColumnStatsProbe probe, 
List<String> allFiles) {
+    throw new UnsupportedOperationException("This method is not supported by " 
+ this.getClass().getSimpleName());
+  }
+
+  /**
+   * NOTE: The stats payload stored in Metadata table for Partition Stats Index
+   * is {@link HoodieMetadataColumnStats}}, with schema:
+   *
+   * <pre>
+   *   |- partition_name: string
+   *   |- min_val: row
+   *   |- max_val: row
+   *   |- null_cnt: long
+   *   |- val_cnt: long
+   *   |- column_name: string
+   * </pre>
+   * Thus, the loading/transposing and candidates computing logic can be 
reused.
+   *
+   * @param probe         Column stats probe constructed from pushed down 
column filters.
+   * @param allPartitions All partitions to be pruned by partition stats.
+   *
+   * @return the candidate partitions pruned by partition stats.
+   */
+  @Override
+  public Set<String> computeCandidatePartitions(ColumnStatsProbe probe, 
List<String> allPartitions) {
+    return super.computeCandidateFiles(probe, allPartitions);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index d1c7d7292d8..18710678bf5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -46,7 +46,7 @@ import org.apache.hudi.source.FileIndex;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
 import org.apache.hudi.source.StreamReadOperator;
-import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
 import 
org.apache.hudi.source.rebalance.partitioner.StreamReadAppendPartitioner;
@@ -154,7 +154,7 @@ public class HoodieTableSource implements
   private int[] requiredPos;
   private long limit;
   private List<Predicate> predicates;
-  private DataPruner dataPruner;
+  private ColumnStatsProbe columnStatsProbe;
   private PartitionPruners.PartitionPruner partitionPruner;
   private int dataBucket;
   private transient FileIndex fileIndex;
@@ -175,7 +175,7 @@ public class HoodieTableSource implements
       String defaultPartName,
       Configuration conf,
       @Nullable List<Predicate> predicates,
-      @Nullable DataPruner dataPruner,
+      @Nullable ColumnStatsProbe columnStatsProbe,
       @Nullable PartitionPruners.PartitionPruner partitionPruner,
       int dataBucket,
       @Nullable int[] requiredPos,
@@ -189,7 +189,7 @@ public class HoodieTableSource implements
     this.defaultPartName = defaultPartName;
     this.conf = conf;
     this.predicates = 
Optional.ofNullable(predicates).orElse(Collections.emptyList());
-    this.dataPruner = dataPruner;
+    this.columnStatsProbe = columnStatsProbe;
     this.partitionPruner = partitionPruner;
     this.dataBucket = dataBucket;
     this.requiredPos = Optional.ofNullable(requiredPos).orElseGet(() -> 
IntStream.range(0, this.tableRowType.getFieldCount()).toArray());
@@ -266,7 +266,7 @@ public class HoodieTableSource implements
   @Override
   public DynamicTableSource copy() {
     return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
-        conf, predicates, dataPruner, partitionPruner, dataBucket, 
requiredPos, limit, metaClient, internalSchemaManager);
+        conf, predicates, columnStatsProbe, partitionPruner, dataBucket, 
requiredPos, limit, metaClient, internalSchemaManager);
   }
 
   @Override
@@ -279,8 +279,8 @@ public class HoodieTableSource implements
     List<ResolvedExpression> simpleFilters = 
filterSimpleCallExpression(filters);
     Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters = 
splitExprByPartitionCall(simpleFilters, this.partitionKeys, this.tableRowType);
     this.predicates = ExpressionPredicates.fromExpression(splitFilters.f0);
-    this.dataPruner = DataPruner.newInstance(splitFilters.f0);
-    this.partitionPruner = cratePartitionPruner(splitFilters.f1);
+    this.columnStatsProbe = ColumnStatsProbe.newInstance(splitFilters.f0);
+    this.partitionPruner = createPartitionPruner(splitFilters.f1, 
columnStatsProbe);
     this.dataBucket = getDataBucket(splitFilters.f0);
     // refuse all the filters now
     return SupportsFilterPushDown.Result.of(new ArrayList<>(splitFilters.f1), 
new ArrayList<>(filters));
@@ -341,8 +341,8 @@ public class HoodieTableSource implements
   }
 
   @Nullable
-  private PartitionPruners.PartitionPruner 
cratePartitionPruner(List<ResolvedExpression> partitionFilters) {
-    if (partitionFilters.isEmpty()) {
+  private PartitionPruners.PartitionPruner 
createPartitionPruner(List<ResolvedExpression> partitionFilters, 
ColumnStatsProbe columnStatsProbe) {
+    if (!isPartitioned() || partitionFilters.isEmpty() && columnStatsProbe == 
null) {
       return null;
     }
     StringJoiner joiner = new StringJoiner(" and ");
@@ -353,9 +353,20 @@ public class HoodieTableSource implements
             this.schema.getColumn(name).orElseThrow(() -> new 
HoodieValidationException("Field " + name + " does not exist")))
         .map(SerializableSchema.Column::getDataType)
         .collect(Collectors.toList());
-    String defaultParName = 
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
-    boolean hivePartition = 
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING);
-    return PartitionPruners.getInstance(evaluators, this.partitionKeys, 
partitionTypes, defaultParName, hivePartition);
+    String defaultParName = conf.get(FlinkOptions.PARTITION_DEFAULT_NAME);
+    boolean hivePartition = conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING);
+
+    return PartitionPruners.builder()
+        .basePath(path.toString())
+        .rowType(tableRowType)
+        .conf(conf)
+        .columnStatsProbe(columnStatsProbe)
+        .partitionEvaluators(evaluators)
+        .partitionKeys(partitionKeys)
+        .partitionTypes(partitionTypes)
+        .defaultParName(defaultParName)
+        .hivePartition(hivePartition)
+        .build();
   }
 
   private int getDataBucket(List<ResolvedExpression> dataFilters) {
@@ -602,7 +613,7 @@ public class HoodieTableSource implements
           .path(this.path)
           .conf(this.conf)
           .rowType(this.tableRowType)
-          .dataPruner(this.dataPruner)
+          .columnStatsProbe(this.columnStatsProbe)
           .partitionPruner(this.partitionPruner)
           .dataBucket(this.dataBucket)
           .build();
@@ -624,6 +635,10 @@ public class HoodieTableSource implements
     return keyIndices;
   }
 
+  private boolean isPartitioned() {
+    return !this.partitionKeys.isEmpty() && 
this.partitionKeys.stream().noneMatch(String::isEmpty);
+  }
+
   @VisibleForTesting
   public Schema getTableAvroSchema() {
     try {
@@ -660,22 +675,27 @@ public class HoodieTableSource implements
    */
   @VisibleForTesting
   public List<StoragePathInfo> getReadFiles() {
-    FileIndex fileIndex = getOrBuildFileIndex();
-    List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
+    List<String> relPartitionPaths = getReadPartitions();
     if (relPartitionPaths.isEmpty()) {
       return Collections.emptyList();
     }
     return fileIndex.getFilesInPartitions();
   }
 
+  @VisibleForTesting
+  public List<String> getReadPartitions() {
+    FileIndex fileIndex = getOrBuildFileIndex();
+    return fileIndex.getOrBuildPartitionPaths();
+  }
+
   @VisibleForTesting
   public List<Predicate> getPredicates() {
     return predicates;
   }
 
   @VisibleForTesting
-  public DataPruner getDataPruner() {
-    return dataPruner;
+  public ColumnStatsProbe getColumnStatsProbe() {
+    return columnStatsProbe;
   }
 
   @VisibleForTesting
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 46ecf7ef8d2..eadf42ca051 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -20,6 +20,7 @@ package org.apache.hudi.util;
 
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
@@ -551,4 +552,16 @@ public class StreamerUtil {
           FlinkOptions.KEYGEN_CLASS_NAME.key(), 
ComplexAvroKeyGenerator.class.getName());
     }
   }
+
+  /**
+   * @return HoodieMetadataConfig constructed from flink configuration.
+   */
+  public static HoodieMetadataConfig 
metadataConfig(org.apache.flink.configuration.Configuration conf) {
+    Properties properties = new Properties();
+
+    // set up metadata.enabled=true in table DDL to enable metadata listing
+    properties.put(HoodieMetadataConfig.ENABLE.key(), 
conf.getBoolean(FlinkOptions.METADATA_ENABLED));
+
+    return 
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
index e3dc0a836c9..5a8d4ea00d9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
@@ -44,7 +44,7 @@ import java.util.Map;
 import java.util.stream.Stream;
 
 import static org.apache.hudi.source.ExpressionEvaluators.fromExpression;
-import static org.apache.hudi.source.prune.DataPruner.convertColumnStats;
+import static org.apache.hudi.source.prune.ColumnStatsProbe.convertColumnStats;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index 20a5cb34fdf..a2d5e10ec4a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -20,9 +20,11 @@ package org.apache.hudi.source;
 
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
-import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
+import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.utils.TestConfigurations;
@@ -41,6 +43,7 @@ import org.apache.flink.table.functions.FunctionIdentifier;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
@@ -56,9 +59,12 @@ import static 
org.apache.hudi.configuration.FlinkOptions.KEYGEN_CLASS_NAME;
 import static org.apache.hudi.configuration.FlinkOptions.METADATA_ENABLED;
 import static 
org.apache.hudi.configuration.FlinkOptions.PARTITION_DEFAULT_NAME;
 import static org.apache.hudi.configuration.FlinkOptions.PARTITION_PATH_FIELD;
+import static 
org.apache.hudi.configuration.FlinkOptions.READ_DATA_SKIPPING_ENABLED;
+import static org.apache.hudi.configuration.FlinkOptions.TABLE_TYPE;
 import static org.apache.hudi.utils.TestData.insertRow;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -133,7 +139,7 @@ public class TestFileIndex {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(), 
TestConfigurations.ROW_DATA_TYPE_BIGINT);
     conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
     conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
-    conf.setBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+    conf.setBoolean(READ_DATA_SKIPPING_ENABLED, true);
     
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
true);
 
     writeBigintDataset(conf);
@@ -142,7 +148,7 @@ public class TestFileIndex {
         FileIndex.builder()
             .path(new StoragePath(tempFile.getAbsolutePath()))
             .conf(conf).rowType(TestConfigurations.ROW_TYPE_BIGINT)
-            .dataPruner(DataPruner.newInstance(Collections.singletonList(new 
CallExpression(
+            
.columnStatsProbe(ColumnStatsProbe.newInstance(Collections.singletonList(new 
CallExpression(
                 FunctionIdentifier.of("greaterThan"),
                 BuiltInFunctionDefinitions.GREATER_THAN,
                 Arrays.asList(
@@ -157,6 +163,54 @@ public class TestFileIndex {
     assertThat(files.size(), is(2));
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testFileListingWithPartitionStatsPruning(HoodieTableType tableType) 
throws Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(READ_DATA_SKIPPING_ENABLED, true);
+    conf.set(METADATA_ENABLED, true);
+    conf.set(TABLE_TYPE, tableType.name());
+    
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
 true);
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      // enable CSI for MOR table to collect col stats for delta write stats,
+      // which will be used to construct partition stats then.
+      
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
true);
+    }
+
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    // uuid > 'id5' and age < 30, only column stats of 'par3' matches the 
filter.
+    ColumnStatsProbe columnStatsProbe =
+        ColumnStatsProbe.newInstance(Arrays.asList(
+            new CallExpression(
+                FunctionIdentifier.of("greaterThan"),
+                BuiltInFunctionDefinitions.GREATER_THAN,
+                Arrays.asList(
+                    new FieldReferenceExpression("uuid", DataTypes.STRING(), 
0, 0),
+                    new ValueLiteralExpression("id5", 
DataTypes.STRING().notNull())
+                ),
+                DataTypes.BOOLEAN()),
+            new CallExpression(
+                FunctionIdentifier.of("lessThan"),
+                BuiltInFunctionDefinitions.LESS_THAN,
+                Arrays.asList(
+                    new FieldReferenceExpression("age", DataTypes.INT(), 2, 2),
+                    new ValueLiteralExpression(30, DataTypes.INT().notNull())
+                ),
+                DataTypes.BOOLEAN())));
+
+    FileIndex fileIndex =
+        FileIndex.builder()
+            .path(new StoragePath(tempFile.getAbsolutePath()))
+            .conf(conf)
+            .rowType(TestConfigurations.ROW_TYPE)
+            
.partitionPruner(PartitionPruners.builder().rowType(TestConfigurations.ROW_TYPE).basePath(tempFile.getAbsolutePath()).conf(conf).columnStatsProbe(columnStatsProbe).build())
+            .build();
+
+    List<String> p = fileIndex.getOrBuildPartitionPaths();
+    assertEquals(Arrays.asList("par3"), p);
+  }
+
   private void writeBigintDataset(Configuration conf) throws Exception {
     List<RowData> dataset = Arrays.asList(
         insertRow(TestConfigurations.ROW_TYPE_BIGINT, 1L, 
StringData.fromString("Danny"), 23,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index db645f3936e..20fe5b1fe95 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.source;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PartitionPruners;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.utils.TestConfigurations;
@@ -44,12 +46,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionIdentifier;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
@@ -331,12 +337,14 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
     
testData.addAll(TestData.DATA_SET_INSERT.stream().collect(Collectors.toList()));
     
testData.addAll(TestData.DATA_SET_INSERT_PARTITION_IS_NULL.stream().collect(Collectors.toList()));
     TestData.writeData(testData, conf);
-    PartitionPruners.PartitionPruner partitionPruner = 
PartitionPruners.getInstance(
-        Collections.singletonList(partitionEvaluator),
-        Collections.singletonList("partition"),
-        Collections.singletonList(DataTypes.STRING()),
-        PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH,
-        false);
+    PartitionPruners.PartitionPruner partitionPruner =
+        PartitionPruners.builder()
+            .partitionEvaluators(Collections.singletonList(partitionEvaluator))
+            .partitionKeys(Collections.singletonList("partition"))
+            .partitionTypes(Collections.singletonList(DataTypes.STRING()))
+            .defaultParName(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH)
+            .hivePartition(false)
+            .build();
     IncrementalInputSplits iis = IncrementalInputSplits.builder()
         .conf(conf)
         .path(new Path(basePath))
@@ -348,6 +356,55 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
     assertEquals(expectedPartitions, partitions);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testInputSplitsWithPartitionStatsPruner(HoodieTableType tableType) 
throws Exception {
+    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+    conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
+    
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
 true);
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      // enable CSI for MOR table to collect col stats for delta write stats,
+      // which will be used to construct partition stats then.
+      
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
true);
+    }
+    metaClient = HoodieTestUtils.init(basePath, tableType);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    // uuid > 'id5' and age < 30, only column stats of 'par3' matches the 
filter.
+    ColumnStatsProbe columnStatsProbe =
+        ColumnStatsProbe.newInstance(Arrays.asList(
+            new CallExpression(
+                FunctionIdentifier.of("greaterThan"),
+                BuiltInFunctionDefinitions.GREATER_THAN,
+                Arrays.asList(
+                    new FieldReferenceExpression("uuid", DataTypes.STRING(), 
0, 0),
+                    new ValueLiteralExpression("id5", 
DataTypes.STRING().notNull())
+                ),
+                DataTypes.BOOLEAN()),
+            new CallExpression(
+                FunctionIdentifier.of("lessThan"),
+                BuiltInFunctionDefinitions.LESS_THAN,
+                Arrays.asList(
+                    new FieldReferenceExpression("age", DataTypes.INT(), 2, 2),
+                    new ValueLiteralExpression(30, DataTypes.INT().notNull())
+                ),
+                DataTypes.BOOLEAN())));
+
+    PartitionPruners.PartitionPruner partitionPruner =
+        
PartitionPruners.builder().rowType(TestConfigurations.ROW_TYPE).basePath(basePath).conf(conf).columnStatsProbe(columnStatsProbe).build();
+    IncrementalInputSplits iis = IncrementalInputSplits.builder()
+        .conf(conf)
+        .path(new Path(basePath))
+        .rowType(TestConfigurations.ROW_TYPE)
+        .partitionPruner(partitionPruner)
+        .build();
+    IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null, 
false);
+    List<String> partitions = getFilteredPartitions(result);
+    assertEquals(Arrays.asList("par3"), partitions);
+  }
+
   @Test
   void testInputSplitsWithSpeedLimit() throws Exception {
     metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
@@ -474,11 +531,22 @@ public class TestIncrementalInputSplits extends 
HoodieCommonTestHarness {
   }
 
   private List<String> getFilteredPartitions(IncrementalInputSplits.Result 
result) {
-    return result.getInputSplits().stream().map(split -> {
-      Option<String> basePath = split.getBasePath();
-      String[] pathParts = basePath.get().split("/");
-      return pathParts[pathParts.length - 2];
-    }).collect(Collectors.toList());
+    List<String> partitions = new ArrayList<>();
+    result.getInputSplits().forEach(split -> {
+      split.getBasePath().map(path -> {
+        String[] pathParts = path.split("/");
+        partitions.add(pathParts[pathParts.length - 2]);
+        return null;
+      });
+      split.getLogPaths().map(paths -> {
+        paths.forEach(path -> {
+          String[] pathParts = path.split("/");
+          partitions.add(pathParts[pathParts.length - 2]);
+        });
+        return null;
+      });
+    });
+    return partitions;
   }
 
   private Integer intervalBetween2Instants(HoodieTimeline timeline, String 
instant1, String instant2) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestDataPruner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestColumnStatsProbe.java
similarity index 96%
rename from 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestDataPruner.java
rename to 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestColumnStatsProbe.java
index 9ef91fe3c5a..01e302acc5f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestDataPruner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestColumnStatsProbe.java
@@ -37,7 +37,7 @@ import java.util.stream.Stream;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
-class TestDataPruner {
+class TestColumnStatsProbe {
 
   @ParameterizedTest
   @MethodSource("testTypes")
@@ -45,7 +45,7 @@ class TestDataPruner {
     DataType rowDataType = getDataRowDataType(dataType);
     DataType indexRowDataType = getIndexRowDataType(dataType);
 
-    Map<String, ColumnStats> stats1 = DataPruner.convertColumnStats(
+    Map<String, ColumnStats> stats1 = ColumnStatsProbe.convertColumnStats(
         getIndexRow(indexRowDataType, minValue, maxValue),
         getDataFields(rowDataType)
     );
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndex.java
similarity index 57%
rename from 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java
rename to 
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndex.java
index 837f4192486..15656d0fa0a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndex.java
@@ -38,15 +38,56 @@ import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
- * Test cases for {@link ColumnStatsIndices}.
+ * Test cases for {@link ColumnStatsIndex}.
  */
-public class TestColumnStatsIndices {
+public class TestColumnStatsIndex {
   @TempDir
   File tempFile;
 
+  @Test
+  void testReadPartitionStatsIndex() throws Exception {
+    final String path = tempFile.getAbsolutePath();
+    Configuration conf = TestConfigurations.getDefaultConf(path);
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.setString("hoodie.metadata.index.partition.stats.enable", "true");
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .enable(true)
+        .withMetadataIndexColumnStats(true)
+        .build();
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    String[] queryColumns = {"uuid", "age"};
+    PartitionStatsIndex indexSupport = new PartitionStatsIndex(path, 
TestConfigurations.ROW_TYPE, metadataConfig);
+    List<RowData> indexRows = 
indexSupport.readColumnStatsIndexByColumns(queryColumns);
+    List<String> results = 
indexRows.stream().map(Object::toString).sorted(String::compareTo).collect(Collectors.toList());
+    List<String> expected = Arrays.asList(
+        "+I(par1,+I(23),+I(33),0,2,age)",
+        "+I(par1,+I(id1),+I(id2),0,2,uuid)",
+        "+I(par2,+I(31),+I(53),0,2,age)",
+        "+I(par2,+I(id3),+I(id4),0,2,uuid)",
+        "+I(par3,+I(18),+I(20),0,2,age)",
+        "+I(par3,+I(id5),+I(id6),0,2,uuid)",
+        "+I(par4,+I(44),+I(56),0,2,age)",
+        "+I(par4,+I(id7),+I(id8),0,2,uuid)");
+    assertEquals(expected, results);
+
+    Pair<List<RowData>, String[]> transposedIndexTable = 
indexSupport.transposeColumnStatsIndex(indexRows, queryColumns);
+    List<String> transposed = 
transposedIndexTable.getLeft().stream().map(Object::toString).sorted(String::compareTo).collect(Collectors.toList());
+    assertThat(transposed.size(), is(4));
+    assertArrayEquals(new String[] {"age", "uuid"}, 
transposedIndexTable.getRight());
+    List<String> expected1 = Arrays.asList(
+        "+I(par1,2,23,33,0,id1,id2,0)",
+        "+I(par2,2,31,53,0,id3,id4,0)",
+        "+I(par3,2,18,20,0,id5,id6,0)",
+        "+I(par4,2,44,56,0,id7,id8,0)");
+    assertEquals(expected1, transposed);
+  }
+
   @Test
   void testTransposeColumnStatsIndex() throws Exception {
     final String path = tempFile.getAbsolutePath();
@@ -63,9 +104,9 @@ public class TestColumnStatsIndices {
 
     // explicit query columns
     String[] queryColumns1 = {"uuid", "age"};
-    List<RowData> indexRows1 = ColumnStatsIndices.readColumnStatsIndex(path, 
metadataConfig, queryColumns1);
-    Pair<List<RowData>, String[]> transposedIndexTable1 = ColumnStatsIndices
-        .transposeColumnStatsIndex(indexRows1, queryColumns1, 
TestConfigurations.ROW_TYPE);
+    FileStatsIndex indexSupport = new FileStatsIndex(path, 
TestConfigurations.ROW_TYPE, metadataConfig);
+    List<RowData> indexRows1 = 
indexSupport.readColumnStatsIndexByColumns(queryColumns1);
+    Pair<List<RowData>, String[]> transposedIndexTable1 = 
indexSupport.transposeColumnStatsIndex(indexRows1, queryColumns1);
     assertThat("The schema columns should sort by natural order",
         Arrays.toString(transposedIndexTable1.getRight()), is("[age, uuid]"));
     List<RowData> transposed1 = 
filterOutFileNames(transposedIndexTable1.getLeft());
@@ -79,7 +120,7 @@ public class TestColumnStatsIndices {
 
     // no query columns, only for tests
     assertThrows(IllegalArgumentException.class,
-        () -> ColumnStatsIndices.readColumnStatsIndex(path, metadataConfig, 
new String[0]));
+        () -> indexSupport.readColumnStatsIndexByColumns(new String[0]));
   }
 
   private static List<RowData> filterOutFileNames(List<RowData> indexRows) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 77b66ff974e..eacb0b5e8a5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
@@ -507,6 +508,57 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, expected, true);
   }
 
+  @ParameterizedTest
+  @MethodSource("tableTypeAndBooleanTrueFalseParams")
+  void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.METADATA_ENABLED, true)
+        .option(FlinkOptions.READ_AS_STREAMING, true)
+        
.option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), true)
+        .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
 true);
+    
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), 
true);
+    conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+    conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
+    conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
+    // write one commit
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    List<String> sqls =
+        Arrays.asList(
+            // no filter
+            "select * from t1",
+            // filter by partition stats pruner only
+            "select * from t1 where uuid > 'id5' and age > 15",
+            // filter by partition stats pruner and dynamic partition pruner
+            "select * from t1 where uuid > 'id5' and age > 15 and `partition` 
> 'par3'");
+    List<String> expectResults =
+        Arrays.asList(
+            "[+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], "
+                + "+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], "
+                + "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
+                + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
+                + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
+                + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+                + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+                + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]",
+            "[+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+                + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+                + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]",
+            "[+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+                + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]");
+    for (int i = 0; i < sqls.size(); i++) {
+      List<Row> result = execSelectSql(streamTableEnv, sqls.get(i), 10);
+      assertRowsEquals(result, expectResults.get(i));
+    }
+  }
+
   @ParameterizedTest
   @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void testStreamReadFilterByPartition(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 1e3550aee27..899b3b15667 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -18,10 +18,11 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.source.ExpressionPredicates;
-import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
 import org.apache.hudi.source.prune.PrimaryKeyPruners;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
@@ -48,6 +49,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.ThrowingSupplier;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +66,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.keygen.constant.KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -163,8 +167,21 @@ public class TestHoodieTableSource {
     List<ResolvedExpression> expectedFilters = 
Collections.singletonList(filterExpr);
     tableSource.applyFilters(expectedFilters);
     HoodieTableSource copiedSource = (HoodieTableSource) tableSource.copy();
-    DataPruner dataPruner = copiedSource.getDataPruner();
-    assertNotNull(dataPruner);
+    ColumnStatsProbe columnStatsProbe = copiedSource.getColumnStatsProbe();
+    assertNotNull(columnStatsProbe);
+  }
+
+  @ParameterizedTest
+  @MethodSource("filtersAndResults")
+  void testDataSkippingWithPartitionStatsPruning(List<ResolvedExpression> 
filters, List<String> expectedPartitions) throws Exception {
+    final String path = tempFile.getAbsolutePath();
+    conf = TestConfigurations.getDefaultConf(path);
+    
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
 true);
+    conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    HoodieTableSource hoodieTableSource = createHoodieTableSource(conf);
+    hoodieTableSource.applyFilters(filters);
+    assertEquals(expectedPartitions, hoodieTableSource.getReadPartitions());
   }
 
   @ParameterizedTest
@@ -325,6 +342,38 @@ public class TestHoodieTableSource {
     
assertEquals(ExpressionPredicates.fromExpression(expectedFilters).toString(), 
actualPredicates);
   }
 
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private static Stream<Arguments> filtersAndResults() {
+    CallExpression filter1 =
+        new CallExpression(
+            BuiltInFunctionDefinitions.GREATER_THAN,
+            Arrays.asList(
+                new FieldReferenceExpression("uuid", DataTypes.STRING(), 0, 0),
+                new ValueLiteralExpression("id5", 
DataTypes.STRING().notNull())),
+            DataTypes.BOOLEAN());
+
+    CallExpression filter2 =
+        new CallExpression(
+            BuiltInFunctionDefinitions.LESS_THAN,
+            Arrays.asList(
+                new FieldReferenceExpression("partition", DataTypes.STRING(), 
4, 4),
+                new ValueLiteralExpression("par4", 
DataTypes.STRING().notNull())),
+            DataTypes.BOOLEAN());
+
+    Object[][] data = new Object[][] {
+        // pruned by partition stats pruner only.
+        {Arrays.asList(filter1), Arrays.asList("par3", "par4")},
+        // pruned by dynamic partition pruner only.
+        {Arrays.asList(filter2), Arrays.asList("par1", "par2", "par3")},
+        // pruned by dynamic pruner and stats pruner.
+        {Arrays.asList(filter1, filter2), Arrays.asList("par3")},
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private HoodieTableSource getEmptyStreamingSource() {
     final String path = tempFile.getAbsolutePath();
     conf = TestConfigurations.getDefaultConf(path);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 37c2c704a1c..ee43902ddd7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -398,7 +398,7 @@ public class TestInputFormat {
         .rowType(TestConfigurations.ROW_TYPE)
         .conf(conf)
         .path(FilePathUtils.toFlinkPath(metaClient.getBasePath()))
-        .partitionPruner(PartitionPruners.getInstance("par1", "par2", "par3", 
"par4"))
+        
.partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1",
 "par2", "par3", "par4")).build())
         .skipCompaction(false)
         .build();
 
@@ -437,7 +437,7 @@ public class TestInputFormat {
         .rowType(TestConfigurations.ROW_TYPE)
         .conf(conf)
         .path(FilePathUtils.toFlinkPath(metaClient.getBasePath()))
-        .partitionPruner(PartitionPruners.getInstance("par1", "par2", "par3", 
"par4"))
+        
.partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1",
 "par2", "par3", "par4")).build())
         .skipCompaction(true)
         .build();
 
@@ -502,7 +502,7 @@ public class TestInputFormat {
         .rowType(TestConfigurations.ROW_TYPE)
         .conf(conf)
         .path(FilePathUtils.toFlinkPath(metaClient.getBasePath()))
-        .partitionPruner(PartitionPruners.getInstance("par1", "par2", "par3", 
"par4"))
+        
.partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1",
 "par2", "par3", "par4")).build())
         .skipClustering(true)
         .build();
 
@@ -659,7 +659,7 @@ public class TestInputFormat {
         .rowType(TestConfigurations.ROW_TYPE)
         .conf(conf)
         .path(FilePathUtils.toFlinkPath(metaClient.getBasePath()))
-        .partitionPruner(PartitionPruners.getInstance("par1", "par2", "par3", 
"par4"))
+        
.partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1",
 "par2", "par3", "par4")).build())
         .build();
 
     // default read the latest commit

Reply via email to