This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 70f44efe298 [HUDI-7945] Fix partition pruning using PARTITION_STATS
index in Spark (#11556)
70f44efe298 is described below
commit 70f44efe298771fcef9d029820a9b431e1ff165c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Jul 4 03:39:21 2024 -0700
[HUDI-7945] Fix partition pruning using PARTITION_STATS index in Spark
(#11556)
* [HUDI-7945] Fix file pruning using PARTITION_STATS index in Spark
* Enhance test preparation
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 15 +++--
.../common/model/HoodieColumnRangeMetadata.java | 18 +++---
.../apache/hudi/common/util/FileFormatUtils.java | 48 +++------------
.../hudi/metadata/HoodieMetadataPayload.java | 5 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 4 +-
.../apache/hudi/common/util/TestBaseFileUtils.java | 20 ++++---
.../org/apache/hudi/common/util/ParquetUtils.java | 18 +-----
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 8 +--
.../scala/org/apache/hudi/HoodieFileIndex.scala | 70 +++++++++++++++++-----
.../apache/hudi/PartitionStatsIndexSupport.scala | 50 +++++++++++++++-
.../org/apache/hudi/SparkBaseIndexSupport.scala | 6 ++
.../TestPartitionStatsIndexWithSql.scala | 18 ++++--
.../functional/TestRecordLevelIndexWithSQL.scala | 3 +-
.../spark/sql/hudi/dml/TestDataSkippingQuery.scala | 2 -
14 files changed, 167 insertions(+), 118 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 83ba64734a1..9cdf1adf971 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -303,10 +303,7 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
// Convert partition's path into partition descriptor
return matchedPartitionPaths.stream()
- .map(partitionPath -> {
- Object[] partitionColumnValues =
parsePartitionColumnValues(partitionColumns, partitionPath);
- return new PartitionPath(partitionPath, partitionColumnValues);
- })
+ .map(this::convertToPartitionPath)
.collect(Collectors.toList());
}
@@ -330,10 +327,7 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
// Convert partition's path into partition descriptor
return matchedPartitionPaths.stream()
- .map(partitionPath -> {
- Object[] partitionColumnValues =
parsePartitionColumnValues(partitionColumns, partitionPath);
- return new PartitionPath(partitionPath, partitionColumnValues);
- })
+ .map(this::convertToPartitionPath)
.collect(Collectors.toList());
}
@@ -494,6 +488,11 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
return (partitionColumns.length > 0 && canParsePartitionValues()) ||
HoodieTableMetadata.isMetadataTable(basePath);
}
+ protected PartitionPath convertToPartitionPath(String partitionPath) {
+ Object[] partitionColumnValues =
parsePartitionColumnValues(partitionColumns, partitionPath);
+ return new PartitionPath(partitionPath, partitionColumnValues);
+ }
+
private static long fileSliceSize(FileSlice fileSlice) {
long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
.filter(s -> s > 0)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
index 8afe08a37c8..1c39acb3c8c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
@@ -18,6 +18,8 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.util.ValidationUtils;
+
import javax.annotation.Nullable;
import java.io.Serializable;
@@ -153,13 +155,15 @@ public class HoodieColumnRangeMetadata<T extends
Comparable> implements Serializ
/**
* Merges the given two column range metadata.
*/
- public static HoodieColumnRangeMetadata<Comparable> merge(
- HoodieColumnRangeMetadata<Comparable> left,
- HoodieColumnRangeMetadata<Comparable> right) {
+ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T> merge(
+ HoodieColumnRangeMetadata<T> left,
+ HoodieColumnRangeMetadata<T> right) {
+
ValidationUtils.checkArgument(left.getColumnName().equals(right.getColumnName()),
+ "Column names should be the same for merging column ranges");
String filePath = left.getFilePath();
String columnName = left.getColumnName();
- Comparable min = minVal(left.getMinValue(), right.getMinValue());
- Comparable max = maxVal(left.getMaxValue(), right.getMaxValue());
+ T min = minVal(left.getMinValue(), right.getMinValue());
+ T max = maxVal(left.getMaxValue(), right.getMaxValue());
long nullCount = left.getNullCount() + right.getNullCount();
long valueCount = left.getValueCount() + right.getValueCount();
long totalSize = left.getTotalSize() + right.getTotalSize();
@@ -167,7 +171,7 @@ public class HoodieColumnRangeMetadata<T extends
Comparable> implements Serializ
return create(filePath, columnName, min, max, nullCount, valueCount,
totalSize, totalUncompressedSize);
}
- private static Comparable minVal(Comparable val1, Comparable val2) {
+ private static <T extends Comparable<T>> T minVal(T val1, T val2) {
if (val1 == null) {
return val2;
}
@@ -177,7 +181,7 @@ public class HoodieColumnRangeMetadata<T extends
Comparable> implements Serializ
return val1.compareTo(val2) < 0 ? val1 : val2;
}
- private static Comparable maxVal(Comparable val1, Comparable val2) {
+ private static <T extends Comparable<T>> T maxVal(T val1, T val2) {
if (val1 == null) {
return val2;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
index 1790ce8675b..2aea1a64d31 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FileFormatUtils.java
@@ -53,52 +53,18 @@ public abstract class FileFormatUtils {
/**
* Aggregate column range statistics across files in a partition.
*
+ * @param relativePartitionPath relative partition path for the column range
stats
* @param fileColumnRanges List of column range statistics for each file in
a partition
*/
- public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInPartition(@Nonnull List<HoodieColumnRangeMetadata<T>>
fileColumnRanges) {
+ public static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
getColumnRangeInPartition(String relativePartitionPath,
+
@Nonnull List<HoodieColumnRangeMetadata<T>> fileColumnRanges)
{
ValidationUtils.checkArgument(!fileColumnRanges.isEmpty(),
"fileColumnRanges should not be empty.");
// There are multiple files. Compute min(file_mins) and max(file_maxs)
return fileColumnRanges.stream()
- .reduce(FileFormatUtils::mergeRanges).orElseThrow(() -> new
HoodieException("MergingColumnRanges failed."));
- }
-
- private static <T extends Comparable<T>> HoodieColumnRangeMetadata<T>
mergeRanges(HoodieColumnRangeMetadata<T> one,
-
HoodieColumnRangeMetadata<T> another) {
-
ValidationUtils.checkArgument(one.getColumnName().equals(another.getColumnName()),
- "Column names should be the same for merging column ranges");
- final T minValue = getMinValueForColumnRanges(one, another);
- final T maxValue = getMaxValueForColumnRanges(one, another);
-
- return HoodieColumnRangeMetadata.create(
- null, one.getColumnName(), minValue, maxValue,
- one.getNullCount() + another.getNullCount(),
- one.getValueCount() + another.getValueCount(),
- one.getTotalSize() + another.getTotalSize(),
- one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
- }
-
- public static <T extends Comparable<T>> T
getMaxValueForColumnRanges(HoodieColumnRangeMetadata<T> one,
HoodieColumnRangeMetadata<T> another) {
- final T maxValue;
- if (one.getMaxValue() != null && another.getMaxValue() != null) {
- maxValue = one.getMaxValue().compareTo(another.getMaxValue()) < 0 ?
another.getMaxValue() : one.getMaxValue();
- } else if (one.getMaxValue() == null) {
- maxValue = another.getMaxValue();
- } else {
- maxValue = one.getMaxValue();
- }
- return maxValue;
- }
-
- public static <T extends Comparable<T>> T
getMinValueForColumnRanges(HoodieColumnRangeMetadata<T> one,
HoodieColumnRangeMetadata<T> another) {
- final T minValue;
- if (one.getMinValue() != null && another.getMinValue() != null) {
- minValue = one.getMinValue().compareTo(another.getMinValue()) < 0 ?
one.getMinValue() : another.getMinValue();
- } else if (one.getMinValue() == null) {
- minValue = another.getMinValue();
- } else {
- minValue = one.getMinValue();
- }
- return minValue;
+ .map(e -> HoodieColumnRangeMetadata.create(
+ relativePartitionPath, e.getColumnName(), e.getMinValue(),
e.getMaxValue(),
+ e.getNullCount(), e.getValueCount(), e.getTotalSize(),
e.getTotalUncompressedSize()))
+ .reduce(HoodieColumnRangeMetadata::merge).orElseThrow(() -> new
HoodieException("MergingColumnRanges failed."));
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 36ed57c87f5..bbdf6be5a65 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -68,7 +68,6 @@ import java.util.stream.Stream;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
-import static org.apache.hudi.common.util.StringUtils.nonEmpty;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -687,11 +686,9 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
return columnRangeMetadataList.stream().map(columnRangeMetadata -> {
HoodieKey key = new HoodieKey(getPartitionStatsIndexKey(partitionPath,
columnRangeMetadata.getColumnName()),
MetadataPartitionType.PARTITION_STATS.getPartitionPath());
- String fileName = nonEmpty(columnRangeMetadata.getFilePath()) ? new
StoragePath(columnRangeMetadata.getFilePath()).getName() : null;
-
HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(),
HoodieMetadataColumnStats.newBuilder()
- .setFileName(fileName)
+ .setFileName(columnRangeMetadata.getFilePath())
.setColumnName(columnRangeMetadata.getColumnName())
.setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
.setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 624525da71a..eb95c8e27fc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -2043,7 +2043,7 @@ public class HoodieTableMetadataUtil {
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
toList())); // Group by column name
// Step 3: Aggregate Column Ranges
Stream<HoodieColumnRangeMetadata<Comparable>>
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
- .map(entry ->
FileFormatUtils.getColumnRangeInPartition(entry.getValue()));
+ .map(entry ->
FileFormatUtils.getColumnRangeInPartition(partitionPath, entry.getValue()));
return HoodieMetadataPayload.createPartitionStatsRecords(partitionPath,
partitionStatsRangeMetadata.collect(toList()), false).iterator();
});
}
@@ -2107,7 +2107,7 @@ public class HoodieTableMetadataUtil {
.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName,
toList())); // Group by column name
// Step 3: Aggregate Column Ranges
Stream<HoodieColumnRangeMetadata<Comparable>>
partitionStatsRangeMetadata = columnMetadataMap.entrySet().stream()
- .map(entry ->
FileFormatUtils.getColumnRangeInPartition(entry.getValue()));
+ .map(entry ->
FileFormatUtils.getColumnRangeInPartition(partitionName, entry.getValue()));
return
HoodieMetadataPayload.createPartitionStatsRecords(partitionName,
partitionStatsRangeMetadata.collect(toList()), false).iterator();
});
} catch (Exception e) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
index 3be4ff9b43c..e73aaebc4ed 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBaseFileUtils.java
@@ -30,18 +30,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestBaseFileUtils {
+ private static final String PARTITION_PATH = "partition";
+ private static final String COLUMN_NAME = "columnName";
@Test
public void testGetColumnRangeInPartition() {
// Step 1: Set Up Test Data
HoodieColumnRangeMetadata<Comparable> fileColumnRange1 =
HoodieColumnRangeMetadata.<Comparable>create(
- "path/to/file1", "columnName", 1, 5, 0, 10, 100, 200);
+ "path/to/file1", COLUMN_NAME, 1, 5, 0, 10, 100, 200);
HoodieColumnRangeMetadata<Comparable> fileColumnRange2 =
HoodieColumnRangeMetadata.<Comparable>create(
- "path/to/file2", "columnName", 3, 8, 1, 15, 120, 250);
+ "path/to/file2", COLUMN_NAME, 3, 8, 1, 15, 120, 250);
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges);
+ HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
// Step 3: Assertions
+ assertEquals(PARTITION_PATH, result.getFilePath());
+ assertEquals(COLUMN_NAME, result.getColumnName());
assertEquals(Integer.valueOf(1), new
Integer(result.getMinValue().toString()));
assertEquals(Integer.valueOf(8), new
Integer(result.getMaxValue().toString()));
assertEquals(1, result.getNullCount());
@@ -54,14 +58,16 @@ public class TestBaseFileUtils {
public void testGetColumnRangeInPartitionWithNullMinMax() {
// Step 1: Set Up Test Data
HoodieColumnRangeMetadata<Comparable> fileColumnRange1 =
HoodieColumnRangeMetadata.<Comparable>create(
- "path/to/file1", "columnName", 1, null, 0, 10, 100, 200);
+ "path/to/file1", COLUMN_NAME, 1, null, 0, 10, 100, 200);
HoodieColumnRangeMetadata<Comparable> fileColumnRange2 =
HoodieColumnRangeMetadata.<Comparable>create(
- "path/to/file2", "columnName", null, 8, 1, 15, 120, 250);
+ "path/to/file2", COLUMN_NAME, null, 8, 1, 15, 120, 250);
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges);
+ HoodieColumnRangeMetadata<Comparable> result =
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges);
// Step 3: Assertions
+ assertEquals(PARTITION_PATH, result.getFilePath());
+ assertEquals(COLUMN_NAME, result.getColumnName());
assertEquals(Integer.valueOf(1), new
Integer(result.getMinValue().toString()));
assertEquals(Integer.valueOf(8), new
Integer(result.getMaxValue().toString()));
assertEquals(1, result.getNullCount());
@@ -79,6 +85,6 @@ public class TestBaseFileUtils {
"path/to/file2", "columnName2", null, 8, 1, 15, 120, 250);
List<HoodieColumnRangeMetadata<Comparable>> fileColumnRanges =
Arrays.asList(fileColumnRange1, fileColumnRange2);
// Step 2: Call the Method
- assertThrows(IllegalArgumentException.class, () ->
FileFormatUtils.getColumnRangeInPartition(fileColumnRanges));
+ assertThrows(IllegalArgumentException.class, () ->
FileFormatUtils.getColumnRangeInPartition(PARTITION_PATH, fileColumnRanges));
}
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index e504622e8dc..aad5f3b09e4 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -430,23 +430,7 @@ public class ParquetUtils extends FileFormatUtils {
// there are multiple blocks. Compute min(block_mins) and max(block_maxs)
return blockRanges.stream()
.sequential()
- .reduce(this::combineRanges).get();
- }
-
- private <T extends Comparable<T>> HoodieColumnRangeMetadata<T> combineRanges(
- HoodieColumnRangeMetadata<T> one,
- HoodieColumnRangeMetadata<T> another
- ) {
- final T minValue = getMinValueForColumnRanges(one, another);
- final T maxValue = getMaxValueForColumnRanges(one, another);
-
- return HoodieColumnRangeMetadata.create(
- one.getFilePath(),
- one.getColumnName(), minValue, maxValue,
- one.getNullCount() + another.getNullCount(),
- one.getValueCount() + another.getValueCount(),
- one.getTotalSize() + another.getTotalSize(),
- one.getTotalUncompressedSize() + another.getTotalUncompressedSize());
+ .reduce(HoodieColumnRangeMetadata::merge).get();
}
private static Comparable<?> convertToNativeJavaType(PrimitiveType
primitiveType, Comparable<?> val) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 0f31d7b3e1d..c3b5228d195 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -63,7 +63,7 @@ class ColumnStatsIndexSupport(spark: SparkSession,
// NOTE: Since [[metadataConfig]] is transient this has to be eagerly
persisted, before this will be passed
// on to the executor
- private val inMemoryProjectionThreshold =
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
+ protected val inMemoryProjectionThreshold =
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
private lazy val indexedColumns: Set[String] = {
val customIndexedColumns =
metadataConfig.getColumnsEnabledForColumnStatsIndex
@@ -86,12 +86,6 @@ class ColumnStatsIndexSupport(spark: SparkSession,
shouldPushDownFilesFilter: Boolean
): Option[Set[String]] = {
if (isIndexAvailable && queryFilters.nonEmpty &&
queryReferencedColumns.nonEmpty) {
- // NOTE: Since executing on-cluster via Spark API has its own
non-trivial amount of overhead,
- // it's most often preferential to fetch Column Stats Index w/in
the same process (usually driver),
- // w/o resorting to on-cluster execution.
- // For that we use a simple-heuristic to determine whether we
should read and process CSI in-memory or
- // on-cluster: total number of rows of the expected projected
portion of the index has to be below the
- // threshold (of 100k records)
val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns,
inMemoryProjectionThreshold)
val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
// NOTE: If partition pruning doesn't prune any files, then there's no
need to apply file filters
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index e987ae47fc7..ccd9c597c51 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -17,6 +17,8 @@
package org.apache.hudi
+import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
+import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD,
PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode,
collectReferencedColumns, convertFilterForTimestampKeyGenerator,
getConfigProperties}
import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT}
@@ -28,8 +30,8 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator,
TimestampBasedKeyGenerator}
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import org.apache.hudi.util.JFunction
+
import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD,
PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -43,6 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
import java.util.stream.Collectors
import javax.annotation.concurrent.NotThreadSafe
+
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}
@@ -103,7 +106,6 @@ case class HoodieFileIndex(spark: SparkSession,
new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
new BucketIndexSupport(spark, metadataConfig, metaClient),
new SecondaryIndexSupport(spark, metadataConfig, metaClient),
- new PartitionStatsIndexSupport(spark, schema, metadataConfig, metaClient),
new FunctionalIndexSupport(spark, metadataConfig, metaClient),
new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
@@ -223,7 +225,8 @@ case class HoodieFileIndex(spark: SparkSession,
def filterFileSlices(dataFilters: Seq[Expression], partitionFilters:
Seq[Expression])
: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {
- val prunedPartitionsAndFileSlices =
getFileSlicesForPrunedPartitions(partitionFilters)
+ val (isPruned, prunedPartitionsAndFileSlices) =
+ prunePartitionsAndGetFileSlices(dataFilters, partitionFilters)
// If there are no data filters, return all the file slices.
// If there are no file slices, return empty list.
@@ -235,9 +238,8 @@ case class HoodieFileIndex(spark: SparkSession,
// - Col-Stats Index is present
// - Record-level Index is present
// - List of predicates (filters) is present
- val shouldPushDownFilesFilter = !partitionFilters.isEmpty
val candidateFilesNamesOpt: Option[Set[String]] =
- lookupCandidateFilesInMetadataTable(dataFilters,
prunedPartitionsAndFileSlices, shouldPushDownFilesFilter) match {
+ lookupCandidateFilesInMetadataTable(dataFilters,
prunedPartitionsAndFileSlices, isPruned) match {
case Success(opt) => opt
case Failure(e) =>
logError("Failed to lookup candidate files in File Index", e)
@@ -288,17 +290,53 @@ case class HoodieFileIndex(spark: SparkSession,
}
}
- def getFileSlicesForPrunedPartitions(partitionFilters: Seq[Expression]) :
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])] = {
- // Prune the partition path by the partition filters
- // NOTE: Non-partitioned tables are assumed to consist from a single
partition
- // encompassing the whole table
- val prunedPartitions = if (shouldEmbedFileSlices) {
-
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters))
- } else {
- listMatchingPartitionPaths(partitionFilters)
- }
- getInputFileSlices(prunedPartitions: _*).asScala.map(
- { case (partition, fileSlices) => (Option.apply(partition),
fileSlices.asScala.toSeq) }).toSeq
+ /**
+ * Prunes table partitions to list if possible.
+ *
+ * @param dataFilters filters based on data columns
+ * @param partitionFilters filters based on partition columns
+ * @return a pair of elements, with the first element indicating whether the
partition pruning
+ * is applied, and the second element as a list of partition paths
and file slices
+ */
+ def prunePartitionsAndGetFileSlices(dataFilters: Seq[Expression],
+ partitionFilters: Seq[Expression]):
+ (Boolean, Seq[(Option[BaseHoodieTableFileIndex.PartitionPath],
Seq[FileSlice])]) = {
+ val isPartitionedTable = getPartitionColumns.length > 0
+ val prunedPartitionsTuple: (Boolean, Seq[PartitionPath]) =
+ if (isPartitionedTable && partitionFilters.nonEmpty) {
+ // For partitioned table and partition filters, prune the partitions
by the partition filters
+ if (shouldEmbedFileSlices) {
+ (true,
listMatchingPartitionPaths(convertFilterForTimestampKeyGenerator(metaClient,
partitionFilters)))
+ } else {
+ (true, listMatchingPartitionPaths(partitionFilters))
+ }
+ } else if (isPartitionedTable && isDataSkippingEnabled) {
+ // For partitioned table and no partition filters, if data skipping is
enabled,
+ // try using the PARTITION_STATS index to prune the partitions
+ lazy val filterReferencedColumns = collectReferencedColumns(spark,
dataFilters, schema)
+ val prunedPartitionPaths = new PartitionStatsIndexSupport(spark,
schema, metadataConfig, metaClient)
+ .prunePartitions(this, dataFilters, filterReferencedColumns)
+ if (prunedPartitionPaths.nonEmpty) {
+ try {
+ (true, prunedPartitionPaths.get.map(e =>
convertToPartitionPath(e)).toSeq)
+ } catch {
+ // If the partition values cannot be parsed by
[[convertToPartitionPath]],
+ // fall back to listing all partitions
+ case _: HoodieException => (false,
listMatchingPartitionPaths(Seq.empty))
+ }
+ } else {
+ // Cannot use partition stats index (not available) for pruning
partitions,
+ // fall back to listing all partitions
+ (false, listMatchingPartitionPaths(Seq.empty))
+ }
+ } else {
+ // Listing all partitions for non-partitioned table,
+ // or partitioned table without partition filter or data skipping or
PARTITION_STATS index
+ (false, listMatchingPartitionPaths(Seq.empty))
+ }
+
+ (prunedPartitionsTuple._1, getInputFileSlices(prunedPartitionsTuple._2:
_*).asScala.map(
+ { case (partition, fileSlices) => (Option.apply(partition),
fileSlices.asScala.toSeq) }).toSeq)
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
index 51b9f8eaaeb..437bfd63a8b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala
@@ -23,15 +23,18 @@ import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats,
HoodieMetadataRecord}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.hash.ColumnIndexID
import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil}
import org.apache.hudi.util.JFunction
+
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Column, SparkSession}
import scala.collection.JavaConverters._
@@ -49,6 +52,15 @@ class PartitionStatsIndexSupport(spark: SparkSession,
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS)
}
+ override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String],
+ prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ shouldPushDownFilesFilter: Boolean
+ ): Option[Set[String]] = {
+ throw new UnsupportedOperationException("This method is not supported by
PartitionStatsIndexSupport")
+ }
+
override def loadColumnStatsIndexRecords(targetColumns: Seq[String],
shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = {
checkState(targetColumns.nonEmpty)
val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
@@ -66,6 +78,40 @@ class PartitionStatsIndexSupport(spark: SparkSession,
columnStatsRecords
}
+
+ def prunePartitions(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String]):
Option[Set[String]] = {
+ if (isIndexAvailable && queryFilters.nonEmpty &&
queryReferencedColumns.nonEmpty) {
+ val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns,
inMemoryProjectionThreshold)
+ loadTransposed(queryReferencedColumns, readInMemory, Option.empty) {
+ transposedPartitionStatsDF => {
+ val allPartitions =
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+ if (allPartitions.nonEmpty) {
+ // PARTITION_STATS index exist for all or some columns in the
filters
+ // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has covered
the case where the
+ // column in a filter does not have the stats available, by
making sure such a
+ // filter does not prune any partition.
+ val indexSchema = transposedPartitionStatsDF.schema
+ val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema)).reduce(And)
+ Some(transposedPartitionStatsDF.where(new Column(indexFilter))
+ .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet)
+ } else {
+ // PARTITION_STATS index does not exist for any column in the
filters, skip the pruning
+ Option.empty
+ }
+ }
+ }
+ } else {
+ Option.empty
+ }
+ }
}
object PartitionStatsIndexSupport {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index d9396433571..2371e4b066e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -118,6 +118,12 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
* or b) executing it on-cluster via Spark [[Dataset]] and [[RDD]] APIs
*/
protected def shouldReadInMemory(fileIndex: HoodieFileIndex,
queryReferencedColumns: Seq[String], inMemoryProjectionThreshold: Integer):
Boolean = {
+ // NOTE: Since executing on-cluster via Spark API has its own non-trivial
amount of overhead,
+ // it's most often preferential to fetch index w/in the same process
(usually driver),
+ // w/o resorting to on-cluster execution.
+ // For that we use a simple-heuristic to determine whether we should
read and process the index in-memory or
+ // on-cluster: total number of rows of the expected projected
portion of the index has to be below the
+ // threshold (of 100k records)
Option(metadataConfig.getColumnStatsIndexProcessingModeOverride) match {
case Some(mode) =>
mode ==
HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 1daa2349264..c8f9029889f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -28,6 +28,7 @@ import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.metadata.MetadataPartitionType.PARTITION_STATS
import org.apache.hudi.util.JFunction
import org.apache.hudi.{DataSourceReadOptions, HoodieFileIndex}
+
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, GreaterThan, LessThan, Literal}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
@@ -152,8 +153,9 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
|location '$tablePath'
""".stripMargin
)
- // set small file limit to 0 so that each insert creates a new file
+ // set small file limit to 0 and parquet max file size to 1 so that
each insert creates a new file
spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql("set hoodie.parquet.max.file.size=1")
// insert data in below pattern so that multiple records for 'texas'
and 'california' partition are in same file
spark.sql(
s"""
@@ -166,7 +168,10 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
spark.sql(
s"""
| insert into $tableName
- | values
(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-C','driver-P','houston','texas'),
(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O','austin','texas')
+ | values
+ |
(1695516137,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-C','driver-P','houston','texas'),
+ |
(1695332066,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O','austin','texas'),
+ |
(1695516138,'e3cf430c-889d-4015-bc98-59bdce1e530d','rider-C','driver-P','houston','texas')
| """.stripMargin
)
@@ -177,6 +182,10 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
.build()
assertResult(tableName)(metaClient.getTableConfig.getTableName)
assertTrue(metaClient.getTableConfig.getMetadataPartitions.contains(PARTITION_STATS.getPartitionPath))
+ val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" ->
metaClient.getBasePath.toString))
+ val partitionFiles = fileIndex.listFiles(Seq.empty, Seq.empty)
+ // Make sure there are partition(s) with a single file and multiple
files
+ assertTrue(partitionFiles.exists(p => p.files.size == 1) &&
partitionFiles.exists(p => p.files.size > 1))
// Test pruning
spark.sql("set hoodie.metadata.enable=true")
@@ -210,6 +219,7 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
Seq("334e26e9-8355-45cc-97c6-c31daf0df330", "rider-A",
"san_francisco", "california"),
Seq("7a84095f-737f-40bc-b62f-6b69664712d2", "rider-B", "new york
city", "new york"),
Seq("e3cf430c-889d-4015-bc98-59bdce1e530c", "rider-C", "houston",
"texas"),
+ Seq("e3cf430c-889d-4015-bc98-59bdce1e530d", "rider-C", "houston",
"texas"),
Seq("3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04", "rider-D", "princeton",
"new jersey"),
Seq("1dced545-862b-4ceb-8b43-d2a568f6616b", "rider-E", "austin",
"texas"),
Seq("e96c4396-3fad-413a-a942-4cb36106d721", "rider-F", "sunnyvale",
"california")
@@ -236,8 +246,8 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
|create table $tableName (
| id int,
| name string,
- | ts long,
- | price int
+ | price int,
+ | ts long
|) using hudi
|partitioned by (ts)
|tblproperties (
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
index 926a2e958ab..448b2a97d91 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndexWithSQL.scala
@@ -216,11 +216,12 @@ class TestRecordLevelIndexWithSQL extends
RecordLevelIndexTestBase {
val fileIndex = new HoodieFileIndex(sparkSession, metaClient,
Option.empty, Map("glob.paths" -> globbedPaths), includeLogFiles =
includeLogFiles)
val selectedPartition = "2016/03/15"
val partitionFilter: Expression = EqualTo(AttributeReference("partition",
StringType)(), Literal(selectedPartition))
- val prunedPaths =
fileIndex.getFileSlicesForPrunedPartitions(Seq(partitionFilter))
+ val (isPruned, prunedPaths) =
fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq(partitionFilter))
val storagePaths =
RecordLevelIndexSupport.getPrunedStoragePaths(prunedPaths, fileIndex)
// verify pruned paths contain the selected partition and the size of the
pruned file paths
// when includeLogFiles is set to true, there are two storages paths -
base file and log file
// every partition contains only one file slice
+ assertTrue(isPruned)
assertEquals(if (includeLogFiles) 2 else 1, storagePaths.size)
assertTrue(storagePaths.forall(path =>
path.toString.contains(selectedPartition)))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
index c318d5e449b..4be2532fb25 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
@@ -102,8 +102,6 @@ class TestDataSkippingQuery extends HoodieSparkSqlTestBase {
checkAnswer(s"select id, name, price, ts, dt from $tableName where
attributes.color = 'red'")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")
)
- // TODO add this fallback param, cause by PartitionStatsIndexSupport,
cause by HUDI-7144,may be fix by HUDI-7903
- spark.sql("set hoodie.fileIndex.dataSkippingFailureMode = fallback")
// Check the case where the WHERE condition only includes columns
supported by column stats
checkAnswer(s"select id, name, price, ts, dt from $tableName where
name='a1'")(
Seq(1, "a1", 10.0, 1000, "2021-01-05")