This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 70f44efe298 [HUDI-7945] Fix partition pruning using PARTITION_STATS 
index in Spark (#11556)
70f44efe298 is described below

commit 70f44efe298771fcef9d029820a9b431e1ff165c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Jul 4 03:39:21 2024 -0700

    [HUDI-7945] Fix partition pruning using PARTITION_STATS index in Spark 
(#11556)
    
    * [HUDI-7945] Fix file pruning using PARTITION_STATS index in Spark
    
    * Enhance test preparation
---
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  | 15 +++--
 .../common/model/HoodieColumnRangeMetadata.java    | 18 +++---
 .../apache/hudi/common/util/FileFormatUtils.java   | 48 +++------------
 .../hudi/metadata/HoodieMetadataPayload.java       |  5 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  4 +-
 .../apache/hudi/common/util/TestBaseFileUtils.java | 20 ++++---
 .../org/apache/hudi/common/util/ParquetUtils.java  | 18 +-----
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |  8 +--
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 70 +++++++++++++++++-----
 .../apache/hudi/PartitionStatsIndexSupport.scala   | 50 +++++++++++++++-
 .../org/apache/hudi/SparkBaseIndexSupport.scala    |  6 ++
 .../TestPartitionStatsIndexWithSql.scala           | 18 ++++--
 .../functional/TestRecordLevelIndexWithSQL.scala   |  3 +-
 .../spark/sql/hudi/dml/TestDataSkippingQuery.scala |  2 -
 14 files changed, 167 insertions(+), 118 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 83ba64734a1..9cdf1adf971 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -303,10 +303,7 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
 
     // Convert partition's path into partition descriptor
     return matchedPartitionPaths.stream()
-        .map(partitionPath -> {
-          Object[] partitionColumnValues = 
parsePartitionColumnValues(partitionColumns, partitionPath);
-          return new PartitionPath(partitionPath, partitionColumnValues);
-        })
+        .map(this::convertToPartitionPath)
         .collect(Collectors.toList());
   }
 
@@ -330,10 +327,7 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
 
     // Convert partition's path into partition descriptor
     return matchedPartitionPaths.stream()
-        .map(partitionPath -> {
-          Object[] partitionColumnValues = 
parsePartitionColumnValues(partitionColumns, partitionPath);
-          return new PartitionPath(partitionPath, partitionColumnValues);
-        })
+        .map(this::convertToPartitionPath)
         .collect(Collectors.toList());
   }
 
@@ -494,6 +488,11 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     return (partitionColumns.length > 0 && canParsePartitionValues()) || 
HoodieTableMetadata.isMetadataTable(basePath);
   }
 
+  protected PartitionPath convertToPartitionPath(String partitionPath) {
+    Object[] partitionColumnValues = 
parsePartitionColumnValues(partitionColumns, partitionPath);
+    return new PartitionPath(partitionPath, partitionColumnValues);
+  }
+
   private static long fileSliceSize(FileSlice fileSlice) {
     long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
         .filter(s -> s > 0)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
index 8afe08a37c8..1c39acb3c8c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.util.ValidationUtils;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
@@ -153,13 +155,15 @@ public class HoodieColumnRangeMetadata<T extends 
Comparable> implements Serializ
   /**
    * Merges the given two column range metadata.
    */
-  public static HoodieColumnRangeMetadata<Comparable> merge(
-      HoodieColumnRangeMetadata<Comparable> left,
-      HoodieColumnRangeMetadata<Comparable> right) {
+  public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> merge(
+      HoodieColumnRangeMetadata<T> left,
+      HoodieColumnRangeMetadata<T> right) {
+    
ValidationUtils.checkArgument(left.getColumnName().equals(right.getColumnName()),
+        "Column names should be the same for merging column ranges");
     String filePath = left.getFilePath();
     String columnName = left.getColumnName();
-    Comparable min = minVal(left.getMinValue(), right.getMinValue());
-    Comparable max = maxVal(left.getMaxValue(), right.getMaxValue());
+    T min = minVal(left.getMinValue(), right.getMinValue());
+    T max = maxVal(left.getMaxValue(), right.getMaxValue());
     long nullCount = left.getNullCount() + right.getNullCount();
     long valueCount = left.getValueCount() + right.getValueCount();
     long totalSize = left.getTotalSize() + right.getTotalSize();
@@ -167,7 +171,7 @@ public class HoodieColumnRangeMetadata<T extends 
Comparable> implements Serializ
     return create(filePath, columnName, min, max, nullCount, valueCount, 
totalSize, totalUncompressedSize);
   }
 
-  private static Comparable minVal(Comparable val1, Comparable val2) {
+  private static <T extends Comparable<T>> T minVal(T val1, T val2) {
     if (val1 == null) {
       return val2;
     }
@@ -177,7 +181,7 @@ public class HoodieColumnRangeMetadata<T extends 
Comparable> implements Serializ
     return val1.compareTo(val2) < 0 ? val1 : val2;
   }
 
-  private static Comparable maxVal(Comparable val1, Comparable val2) {
+  private static <T extends Comparable<T>> T maxVal(T val1, T val2) {
     if (val1 == null) {
       return val2;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index 1790ce8675b..2aea1a64d31 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -53,52 +53,18 @@ public abstract class FileFormatUtils {
   /**
    * Aggregate column range statistics across files in a partition.
    *
+   * @param relativePartitionPath relative partition path for the column range 
stats
    * @param fileColumnRanges List of column range statistics for each file in 
a partition
    */
-  public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>> 
fileColumnRanges) {
+  public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
getColumnRangeInPartition(String relativePartitionPath,
+                                                                               
                  @Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges) 
{
     ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(), 
"fileColumnRanges should not be empty.");
     // There are multiple files. Compute min(file_mins) and max(file_maxs)
     return fileColumnRanges.stream()
-        .reduce(FileFormatUtils::mergeRanges).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
-  }
-
-  private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> 
mergeRanges(HoodieColumnRangeMetadata<T> one,
-                                                                               
     HoodieColumnRangeMetadata<T> another) {
-    
ValidationUtils.checkArgument(one.getColumnName().equals(another.getColumnName()),
-        "Column names should be the same for merging column ranges");
-    final T minValue = getMinValueForColumnRanges(one, another);
-    final T maxValue = getMaxValueForColumnRanges(one, another);
-
-    return HoodieColumnRangeMetadata.create(
-        null, one.getColumnName(), minValue, maxValue,
-        one.getNullCount() + another.getNullCount(),
-        one.getValueCount() + another.getValueCount(),
-        one.getTotalSize() + another.getTotalSize(),
-        one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
-  }
-
-  public static <T extends Comparable<T>> T 
getMaxValueForColumnRanges(HoodieColumnRangeMetadata<T> one, 
HoodieColumnRangeMetadata<T> another) {
-    final T maxValue;
-    if (one.getMaxValue() != null && another.getMaxValue() != null) {
-      maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ? 
another.getMaxValue() : one.getMaxValue();
-    } else if (one.getMaxValue() == null) {
-      maxValue = another.getMaxValue();
-    } else {
-      maxValue = one.getMaxValue();
-    }
-    return maxValue;
-  }
-
-  public static <T extends Comparable<T>> T 
getMinValueForColumnRanges(HoodieColumnRangeMetadata<T> one, 
HoodieColumnRangeMetadata<T> another) {
-    final T minValue;
-    if (one.getMinValue() != null && another.getMinValue() != null) {
-      minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ? 
one.getMinValue() : another.getMinValue();
-    } else if (one.getMinValue() == null) {
-      minValue = another.getMinValue();
-    } else {
-      minValue = one.getMinValue();
-    }
-    return minValue;
+        .map(e -> HoodieColumnRangeMetadata.create(
+            relativePartitionPath, e.getColumnName(), e.getMinValue(), 
e.getMaxValue(),
+            e.getNullCount(), e.getValueCount(), e.getTotalSize(), 
e.getTotalUncompressedSize()))
+        .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new 
HoodieException("MergingColumnRanges failed."));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 36ed57c87f5..bbdf6be5a65 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -68,7 +68,6 @@ import java.util.stream.Stream;
 
 import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
-import static org.apache.hudi.common.util.StringUtils.nonEmpty;
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -687,11 +686,9 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
       HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath, 
columnRangeMetadata.getColumnName()),
           MetadataPartitionType.PARTITION_STATS.getPartitionPath());
-      String fileName = nonEmpty(columnRangeMetadata.getFilePath()) ? new 
StoragePath(columnRangeMetadata.getFilePath()).getName() : null;
-
       HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(),
           HoodieMetadataColumnStats.newBuilder()
-              .setFileName(fileName)
+              .setFileName(columnRangeMetadata.getFilePath())
               .setColumnName(columnRangeMetadata.getColumnName())
               
.setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
               
.setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 624525da71a..eb95c8e27fc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -2043,7 +2043,7 @@ public class HoodieTableMetadataUtil {
           
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName, 
toList())); // Group by column name
       // Step 3: Aggregate Column Ranges
       Stream<HoodieColumnRangeMetadata<Comparable>> 
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
-          .map(entry -> 
FileFormatUtils.getColumnRangeInPartition(entry.getValue()));
+          .map(entry -> 
FileFormatUtils.getColumnRangeInPartition(partitionPath, entry.getValue()));
       return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath, 
partitionStatsRangeMetadata.collect(toList()), false).iterator();
     });
   }
@@ -2107,7 +2107,7 @@ public class HoodieTableMetadataUtil {
             
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName, 
toList())); // Group by column name
         // Step 3: Aggregate Column Ranges
         Stream<HoodieColumnRangeMetadata<Comparable>> 
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
-            .map(entry -> 
FileFormatUtils.getColumnRangeInPartition(entry.getValue()));
+            .map(entry -> 
FileFormatUtils.getColumnRangeInPartition(partitionName, entry.getValue()));
         return 
HoodieMetadataPayload.createPartitionStatsRecords(partitionName, 
partitionStatsRangeMetadata.collect(toList()), false).iterator();
       });
     } catch (Exception e) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
index 3be4ff9b43c..e73aaebc4ed 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
@@ -30,18 +30,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class TestBaseFileUtils {
+  private static final String PARTITION_PATH = "partition";
+  private static final String COLUMN_NAME = "columnName";
 
   @Test
   public void testGetColumnRangeInPartition() {
     // Step 1: Set Up Test Data
     HoodieColumnRangeMetadata<Comparable> fileColumnRange1 = 
HoodieColumnRangeMetadata.<Comparable>create(
-        "path/to/file1", "columnName", 1, 5, 0, 10, 100, 200);
+        "path/to/file1", COLUMN_NAME, 1, 5, 0, 10, 100, 200);
     HoodieColumnRangeMetadata<Comparable> fileColumnRange2 = 
HoodieColumnRangeMetadata.<Comparable>create(
-        "path/to/file2", "columnName", 3, 8, 1, 15, 120, 250);
+        "path/to/file2", COLUMN_NAME, 3, 8, 1, 15, 120, 250);
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges);
+    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
     // Step 3: Assertions
+    assertEquals(PARTITION_PATH, result.getFilePath());
+    assertEquals(COLUMN_NAME, result.getColumnName());
     assertEquals(Integer.valueOf(1), new 
Integer(result.getMinValue().toString()));
     assertEquals(Integer.valueOf(8), new 
Integer(result.getMaxValue().toString()));
     assertEquals(1, result.getNullCount());
@@ -54,14 +58,16 @@ public class TestBaseFileUtils {
   public void testGetColumnRangeInPartitionWithNullMinMax() {
     // Step 1: Set Up Test Data
     HoodieColumnRangeMetadata<Comparable> fileColumnRange1 = 
HoodieColumnRangeMetadata.<Comparable>create(
-        "path/to/file1", "columnName", 1, null, 0, 10, 100, 200);
+        "path/to/file1", COLUMN_NAME, 1, null, 0, 10, 100, 200);
     HoodieColumnRangeMetadata<Comparable> fileColumnRange2 = 
HoodieColumnRangeMetadata.<Comparable>create(
-        "path/to/file2", "columnName", null, 8, 1, 15, 120, 250);
+        "path/to/file2", COLUMN_NAME, null, 8, 1, 15, 120, 250);
 
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges);
+    HoodieColumnRangeMetadata<Comparable> result = 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
     // Step 3: Assertions
+    assertEquals(PARTITION_PATH, result.getFilePath());
+    assertEquals(COLUMN_NAME, result.getColumnName());
     assertEquals(Integer.valueOf(1), new 
Integer(result.getMinValue().toString()));
     assertEquals(Integer.valueOf(8), new 
Integer(result.getMaxValue().toString()));
     assertEquals(1, result.getNullCount());
@@ -79,6 +85,6 @@ public class TestBaseFileUtils {
         "path/to/file2", "columnName2", null, 8, 1, 15, 120, 250);
     List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges = 
Arrays.asList(fileColumnRange1, fileColumnRange2);
     // Step 2: Call the Method
-    assertThrows(IllegalArgumentException.class, () -> 
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges));
+    assertThrows(IllegalArgumentException.class, () -> 
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges));
   }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e504622e8dc..aad5f3b09e4 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -430,23 +430,7 @@ public class ParquetUtils extends FileFormatUtils {
     // there are multiple blocks. Compute min(block_mins) and max(block_maxs)
     return blockRanges.stream()
         .sequential()
-        .reduce(this::combineRanges).get();
-  }
-
-  private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
-      HoodieColumnRangeMetadata<T> one,
-      HoodieColumnRangeMetadata<T> another
-  ) {
-    final T minValue = getMinValueForColumnRanges(one, another);
-    final T maxValue = getMaxValueForColumnRanges(one, another);
-
-    return HoodieColumnRangeMetadata.create(
-        one.getFilePath(),
-        one.getColumnName(), minValue, maxValue,
-        one.getNullCount() + another.getNullCount(),
-        one.getValueCount() + another.getValueCount(),
-        one.getTotalSize() + another.getTotalSize(),
-        one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
+        .reduce(HoodieColumnRangeMetadata::merge).get();
   }
 
   private static Comparable<?> convertToNativeJavaType(PrimitiveType 
primitiveType, Comparable<?> val) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 0f31d7b3e1d..c3b5228d195 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -63,7 +63,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
 
   // NOTE: Since [[metadataConfig]] is transient this has to be eagerly 
persisted, before this will be passed
   //       on to the executor
-  private val inMemoryProjectionThreshold = 
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
+  protected val inMemoryProjectionThreshold = 
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
 
   private lazy val indexedColumns: Set[String] = {
     val customIndexedColumns = 
metadataConfig.getColumnsEnabledForColumnStatsIndex
@@ -86,12 +86,6 @@ class ColumnStatsIndexSupport(spark: SparkSession,
                                          shouldPushDownFilesFilter: Boolean
                                         ): Option[Set[String]] = {
     if (isIndexAvailable && queryFilters.nonEmpty && 
queryReferencedColumns.nonEmpty) {
-      // NOTE: Since executing on-cluster via Spark API has its own 
non-trivial amount of overhead,
-      //       it's most often preferential to fetch Column Stats Index w/in 
the same process (usually driver),
-      //       w/o resorting to on-cluster execution.
-      //       For that we use a simple-heuristic to determine whether we 
should read and process CSI in-memory or
-      //       on-cluster: total number of rows of the expected projected 
portion of the index has to be below the
-      //       threshold (of 100k records)
       val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, 
inMemoryProjectionThreshold)
       val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
       // NOTE: If partition pruning doesn't prune any files, then there's no 
need to apply file filters
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index e987ae47fc7..ccd9c597c51 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -17,6 +17,8 @@
 
 package org.apache.hudi
 
+import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
+import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, 
PRECOMBINE_FIELD, RECORDKEY_FIELD}
 import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, 
collectReferencedColumns, convertFilterForTimestampKeyGenerator, 
getConfigProperties}
 import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT}
@@ -28,8 +30,8 @@ import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, 
TimestampBasedKeyGenerator}
 import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
 import org.apache.hudi.util.JFunction
+
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, 
PRECOMBINE_FIELD, RECORDKEY_FIELD}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -43,6 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String
 import java.text.SimpleDateFormat
 import java.util.stream.Collectors
 import javax.annotation.concurrent.NotThreadSafe
+
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 import scala.util.{Failure, Success, Try}
@@ -103,7 +106,6 @@ case class HoodieFileIndex(spark: SparkSession,
     new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
     new BucketIndexSupport(spark, metadataConfig, metaClient),
     new SecondaryIndexSupport(spark, metadataConfig, metaClient),
-    new PartitionStatsIndexSupport(spark, schema, metadataConfig, metaClient),
     new FunctionalIndexSupport(spark, metadataConfig, metaClient),
     new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
     new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
@@ -223,7 +225,8 @@ case class HoodieFileIndex(spark: SparkSession,
   def filterFileSlices(dataFilters: Seq[Expression], partitionFilters: 
Seq[Expression])
   : Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {
 
-    val prunedPartitionsAndFileSlices = 
getFileSlicesForPrunedPartitions(partitionFilters)
+    val (isPruned, prunedPartitionsAndFileSlices) =
+      prunePartitionsAndGetFileSlices(dataFilters, partitionFilters)
 
     // If there are no data filters, return all the file slices.
     // If there are no file slices, return empty list.
@@ -235,9 +238,8 @@ case class HoodieFileIndex(spark: SparkSession,
       //    - Col-Stats Index is present
       //    - Record-level Index is present
       //    - List of predicates (filters) is present
-      val shouldPushDownFilesFilter = !partitionFilters.isEmpty
       val candidateFilesNamesOpt: Option[Set[String]] =
-      lookupCandidateFilesInMetadataTable(dataFilters, 
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) match {
+        lookupCandidateFilesInMetadataTable(dataFilters, 
prunedPartitionsAndFileSlices, isPruned) match {
         case Success(opt) => opt
         case Failure(e) =>
           logError("Failed to lookup candidate files in File Index", e)
@@ -288,17 +290,53 @@ case class HoodieFileIndex(spark: SparkSession,
     }
   }
 
-  def getFileSlicesForPrunedPartitions(partitionFilters: Seq[Expression]) : 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {
-    // Prune the partition path by the partition filters
-    // NOTE: Non-partitioned tables are assumed to consist from a single 
partition
-    //       encompassing the whole table
-    val prunedPartitions = if (shouldEmbedFileSlices) {
-      
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters))
-    } else {
-      listMatchingPartitionPaths(partitionFilters)
-    }
-    getInputFileSlices(prunedPartitions: _*).asScala.map(
-      { case (partition, fileSlices) => (Option.apply(partition), 
fileSlices.asScala.toSeq) }).toSeq
+  /**
+   * Prunes table partitions to list if possible.
+   *
+   * @param dataFilters      filters based on data columns
+   * @param partitionFilters filters based on partition columns
+   * @return a pair of elements, with the first element indicating whether the 
partition pruning
+   *         is applied, and the second element as a list of partition paths 
and file slices
+   */
+  def prunePartitionsAndGetFileSlices(dataFilters: Seq[Expression],
+                                      partitionFilters: Seq[Expression]):
+  (Boolean, Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], 
Seq[FileSlice])]) = {
+    val isPartitionedTable = getPartitionColumns.length > 0
+    val prunedPartitionsTuple: (Boolean, Seq[PartitionPath]) =
+      if (isPartitionedTable && partitionFilters.nonEmpty) {
+        // For partitioned table and partition filters, prune the partitions 
by the partition filters
+        if (shouldEmbedFileSlices) {
+          (true, 
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters)))
+        } else {
+          (true, listMatchingPartitionPaths(partitionFilters))
+        }
+      } else if (isPartitionedTable && isDataSkippingEnabled) {
+        // For partitioned table and no partition filters, if data skipping is 
enabled,
+        // try using the PARTITION_STATS index to prune the partitions
+        lazy val filterReferencedColumns = collectReferencedColumns(spark, 
dataFilters, schema)
+        val prunedPartitionPaths = new PartitionStatsIndexSupport(spark, 
schema, metadataConfig, metaClient)
+          .prunePartitions(this, dataFilters, filterReferencedColumns)
+        if (prunedPartitionPaths.nonEmpty) {
+          try {
+            (true, prunedPartitionPaths.get.map(e => 
convertToPartitionPath(e)).toSeq)
+          } catch {
+            // If the partition values cannot be parsed by 
[[convertToPartitionPath]],
+            // fall back to listing all partitions
+            case _: HoodieException => (false, 
listMatchingPartitionPaths(Seq.empty))
+          }
+        } else {
+          // Cannot use partition stats index (not available) for pruning 
partitions,
+          // fall back to listing all partitions
+          (false, listMatchingPartitionPaths(Seq.empty))
+        }
+      } else {
+        // Listing all partitions for non-partitioned table,
+        // or partitioned table without partition filter or data skipping or 
PARTITION_STATS index
+        (false, listMatchingPartitionPaths(Seq.empty))
+      }
+
+    (prunedPartitionsTuple._1, getInputFileSlices(prunedPartitionsTuple._2: 
_*).asScala.map(
+      { case (partition, fileSlices) => (Option.apply(partition), 
fileSlices.asScala.toSeq) }).toSeq)
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index 51b9f8eaaeb..437bfd63a8b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -23,15 +23,18 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, 
HoodieMetadataRecord}
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.data.HoodieData
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.hash.ColumnIndexID
 import org.apache.hudi.metadata.{HoodieMetadataPayload, 
HoodieTableMetadataUtil}
 import org.apache.hudi.util.JFunction
+
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import 
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Column, SparkSession}
 
 import scala.collection.JavaConverters._
 
@@ -49,6 +52,15 @@ class PartitionStatsIndexSupport(spark: SparkSession,
       
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS)
   }
 
+  override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                         queryFilters: Seq[Expression],
+                                         queryReferencedColumns: Seq[String],
+                                         prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         shouldPushDownFilesFilter: Boolean
+                                        ): Option[Set[String]] = {
+    throw new UnsupportedOperationException("This method is not supported by 
PartitionStatsIndexSupport")
+  }
+
   override def loadColumnStatsIndexRecords(targetColumns: Seq[String], 
shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
     checkState(targetColumns.nonEmpty)
     val encodedTargetColumnNames = targetColumns.map(colName => new 
ColumnIndexID(colName).asBase64EncodedString())
@@ -66,6 +78,40 @@ class PartitionStatsIndexSupport(spark: SparkSession,
 
     columnStatsRecords
   }
+
+  def prunePartitions(fileIndex: HoodieFileIndex,
+                      queryFilters: Seq[Expression],
+                      queryReferencedColumns: Seq[String]): 
Option[Set[String]] = {
+    if (isIndexAvailable && queryFilters.nonEmpty && 
queryReferencedColumns.nonEmpty) {
+      val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, 
inMemoryProjectionThreshold)
+      loadTransposed(queryReferencedColumns, readInMemory, Option.empty) {
+        transposedPartitionStatsDF => {
+          val allPartitions = 
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+            .collect()
+            .map(_.getString(0))
+            .toSet
+          if (allPartitions.nonEmpty) {
+            // PARTITION_STATS index exist for all or some columns in the 
filters
+            // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has covered 
the case where the
+            //       column in a filter does not have the stats available, by 
making sure such a
+            //       filter does not prune any partition.
+            val indexSchema = transposedPartitionStatsDF.schema
+            val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And)
+            Some(transposedPartitionStatsDF.where(new Column(indexFilter))
+              .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+              .collect()
+              .map(_.getString(0))
+              .toSet)
+          } else {
+            // PARTITION_STATS index does not exist for any column in the 
filters, skip the pruning
+            Option.empty
+          }
+        }
+      }
+    } else {
+      Option.empty
+    }
+  }
 }
 
 object PartitionStatsIndexSupport {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index d9396433571..2371e4b066e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -118,6 +118,12 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
    * or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
    */
   protected def shouldReadInMemory(fileIndex: HoodieFileIndex, 
queryReferencedColumns: Seq[String], inMemoryProjectionThreshold: Integer): 
Boolean = {
+    // NOTE: Since executing on-cluster via Spark API has its own non-trivial 
amount of overhead,
+    //       it's most often preferential to fetch index w/in the same process 
(usually driver),
+    //       w/o resorting to on-cluster execution.
+    //       For that we use a simple-heuristic to determine whether we should 
read and process the index in-memory or
+    //       on-cluster: total number of rows of the expected projected 
portion of the index has to be below the
+    //       threshold (of 100k records)
     Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
       case Some(mode) =>
         mode == 
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 1daa2349264..c8f9029889f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.metadata.HoodieMetadataFileSystemView
 import org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS
 import org.apache.hudi.util.JFunction
 import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
+
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, GreaterThan, LessThan, Literal}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
@@ -152,8 +153,9 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
              |location '$tablePath'
          """.stripMargin
         )
-        // set small file limit to 0 so that each insert creates a new file
+        // set small file limit to 0 and parquet max file size to 1 so that 
each insert creates a new file
         spark.sql("set hoodie.parquet.small.file.limit=0")
+        spark.sql("set hoodie.parquet.max.file.size=1")
         // insert data in below pattern so that multiple records for 'texas' 
and 'california' partition are in same file
         spark.sql(
           s"""
@@ -166,7 +168,10 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
         spark.sql(
           s"""
              | insert into $tableName
-             | values 
(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-C','driver-P','houston','texas'),
 
(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O','austin','texas')
+             | values
+             | 
(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-C','driver-P','houston','texas'),
+             | 
(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O','austin','texas'),
+             | 
(1695516138,'e3cf430c-889d-4015-bc98-59bdce1e530d','rider-C','driver-P','houston','texas')
              | """.stripMargin
         )
 
@@ -177,6 +182,10 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
           .build()
         assertResult(tableName)(metaClient.getTableConfig.getTableName)
         
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+        val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> 
metaClient.getBasePath.toString))
+        val partitionFiles = fileIndex.listFiles(Seq.empty, Seq.empty)
+        // Make sure there are partition(s) with a single file and multiple 
files
+        assertTrue(partitionFiles.exists(p => p.files.size == 1) && 
partitionFiles.exists(p => p.files.size > 1))
 
         // Test pruning
         spark.sql("set hoodie.metadata.enable=true")
@@ -210,6 +219,7 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
           Seq("334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A", 
"san_francisco", "california"),
           Seq("7a84095f-737f-40bc-b62f-6b69664712d2", "rider-B", "new york 
city", "new york"),
           Seq("e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-C", "houston", 
"texas"),
+          Seq("e3cf430c-889d-4015-bc98-59bdce1e530d", "rider-C", "houston", 
"texas"),
           Seq("3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04", "rider-D", "princeton", 
"new jersey"),
           Seq("1dced545-862b-4ceb-8b43-d2a568f6616b", "rider-E", "austin", 
"texas"),
           Seq("e96c4396-3fad-413a-a942-4cb36106d721", "rider-F", "sunnyvale", 
"california")
@@ -236,8 +246,8 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
              |create table $tableName (
              |  id int,
              |  name string,
-             |  ts long,
-             |  price int
+             |  price int,
+             |  ts long
              |) using hudi
              |partitioned by (ts)
              |tblproperties (
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
index 926a2e958ab..448b2a97d91 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
@@ -216,11 +216,12 @@ class TestRecordLevelIndexWithSQL extends 
RecordLevelIndexTestBase {
     val fileIndex = new HoodieFileIndex(sparkSession, metaClient, 
Option.empty, Map("glob.paths" -> globbedPaths), includeLogFiles = 
includeLogFiles)
     val selectedPartition = "2016/03/15"
     val partitionFilter: Expression = EqualTo(AttributeReference("partition", 
StringType)(), Literal(selectedPartition))
-    val prunedPaths = 
fileIndex.getFileSlicesForPrunedPartitions(Seq(partitionFilter))
+    val (isPruned, prunedPaths) = 
fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq(partitionFilter))
     val storagePaths = 
RecordLevelIndexSupport.getPrunedStoragePaths(prunedPaths, fileIndex)
     // verify pruned paths contain the selected partition and the size of the 
pruned file paths
     // when includeLogFiles is set to true, there are two storages paths - 
base file and log file
     // every partition contains only one file slice
+    assertTrue(isPruned)
     assertEquals(if (includeLogFiles) 2 else 1, storagePaths.size)
     assertTrue(storagePaths.forall(path => 
path.toString.contains(selectedPartition)))
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
index c318d5e449b..4be2532fb25 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
@@ -102,8 +102,6 @@ class TestDataSkippingQuery extends HoodieSparkSqlTestBase {
       checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
         Seq(1, "a1", 10.0, 1000, "2021-01-05")
       )
-      // TODO add this fallback param, cause by PartitionStatsIndexSupport, 
cause by HUDI-7144,may be fix by HUDI-7903
-      spark.sql("set hoodie.fileIndex.dataSkippingFailureMode = fallback")
       // Check the case where the WHERE condition only includes columns 
supported by column stats
       checkAnswer(s"select id, name, price, ts, dt from $tableName where 
name='a1'")(
         Seq(1, "a1", 10.0, 1000, "2021-01-05")


Reply via email to