This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 50ab8c79dfe [HUDI-8196] Support pruning based on partition stats index
in Hudi Flink (#12132)
50ab8c79dfe is described below
commit 50ab8c79dfefc4c13fe297c14d9d7dced58a4593
Author: Shuo Cheng <[email protected]>
AuthorDate: Sat Oct 26 10:17:34 2024 +0800
[HUDI-8196] Support pruning based on partition stats index in Hudi Flink
(#12132)
* add a new interface ColumnStatsIndex;
* refactor PartitionPruner to apply builder pattern.
---------
Co-authored-by: danny0405 <[email protected]>
---
.../hudi/sink/bootstrap/BootstrapOperator.java | 2 +-
.../java/org/apache/hudi/source/FileIndex.java | 112 +++--------
.../{DataPruner.java => ColumnStatsProbe.java} | 10 +-
.../apache/hudi/source/prune/PartitionPruners.java | 169 ++++++++++++++--
.../apache/hudi/source/stats/ColumnStatsIndex.java | 57 ++++++
.../hudi/source/stats/ColumnStatsSchemas.java | 83 ++++++++
...ColumnStatsIndices.java => FileStatsIndex.java} | 218 +++++++++++++--------
.../hudi/source/stats/PartitionStatsIndex.java | 77 ++++++++
.../org/apache/hudi/table/HoodieTableSource.java | 54 +++--
.../java/org/apache/hudi/util/StreamerUtil.java | 13 ++
.../hudi/source/TestExpressionEvaluators.java | 2 +-
.../java/org/apache/hudi/source/TestFileIndex.java | 60 +++++-
.../hudi/source/TestIncrementalInputSplits.java | 90 +++++++--
...stDataPruner.java => TestColumnStatsProbe.java} | 4 +-
...StatsIndices.java => TestColumnStatsIndex.java} | 53 ++++-
.../apache/hudi/table/ITTestHoodieDataSource.java | 52 +++++
.../apache/hudi/table/TestHoodieTableSource.java | 55 +++++-
.../apache/hudi/table/format/TestInputFormat.java | 8 +-
18 files changed, 884 insertions(+), 235 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
index 1e59ee8199e..86c6ee39f2c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
@@ -71,8 +71,8 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import static java.util.stream.Collectors.toList;
-import static org.apache.hudi.source.FileIndex.metadataConfig;
import static org.apache.hudi.util.StreamerUtil.isValidFile;
+import static org.apache.hudi.util.StreamerUtil.metadataConfig;
/**
* The operator to load index from existing hoodieTable.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index e4975c644a3..a8ff13fe621 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -21,30 +21,25 @@ package org.apache.hudi.source;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.index.bucket.BucketIdentifier;
-import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
-import org.apache.hudi.source.stats.ColumnStatsIndices;
+import org.apache.hudi.source.stats.FileStatsIndex;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -54,7 +49,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -69,24 +63,30 @@ public class FileIndex implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(FileIndex.class);
private final StoragePath path;
- private final RowType rowType;
private final boolean tableExists;
private final HoodieMetadataConfig metadataConfig;
private final org.apache.hadoop.conf.Configuration hadoopConf;
private final PartitionPruners.PartitionPruner partitionPruner; // for
partition pruning
- private final DataPruner dataPruner; // for data
skipping
+ private final ColumnStatsProbe colStatsProbe; // for
probing column stats
private final int dataBucket; // for
bucket pruning
private List<String> partitionPaths; // cache of
partition paths
-
- private FileIndex(StoragePath path, Configuration conf, RowType rowType,
DataPruner dataPruner, PartitionPruners.PartitionPruner partitionPruner, int
dataBucket) {
+ private final FileStatsIndex fileStatsIndex; // for data
skipping
+
+ private FileIndex(
+ StoragePath path,
+ Configuration conf,
+ RowType rowType,
+ ColumnStatsProbe colStatsProbe,
+ PartitionPruners.PartitionPruner partitionPruner,
+ int dataBucket) {
this.path = path;
- this.rowType = rowType;
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf);
- this.metadataConfig = metadataConfig(conf);
- this.dataPruner =
isDataSkippingFeasible(conf.getBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED))
? dataPruner : null;
+ this.metadataConfig = StreamerUtil.metadataConfig(conf);
+ this.colStatsProbe =
isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ?
colStatsProbe : null;
this.partitionPruner = partitionPruner;
this.dataBucket = dataBucket;
+ this.fileStatsIndex = new FileStatsIndex(path.toString(), rowType,
metadataConfig);
}
/**
@@ -154,6 +154,9 @@ public class FileIndex implements Serializable {
}
String[] partitions =
getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path,
p)).toArray(String[]::new);
+ if (partitions.length < 1) {
+ return Collections.emptyList();
+ }
List<StoragePathInfo> allFiles = FSUtils.getFilesInPartitions(
new HoodieFlinkEngineContext(hadoopConf),
new HoodieHadoopStorage(path,
HadoopFSUtils.getStorageConf(hadoopConf)), metadataConfig, path.toString(),
partitions)
@@ -177,7 +180,8 @@ public class FileIndex implements Serializable {
}
// data skipping
- Set<String> candidateFiles = candidateFilesInMetadataTable(allFiles);
+ Set<String> candidateFiles = fileStatsIndex.computeCandidateFiles(
+ colStatsProbe, allFiles.stream().map(f ->
f.getPath().getName()).collect(Collectors.toList()));
if (candidateFiles == null) {
// no need to filter by col stats or error occurs.
return allFiles;
@@ -215,65 +219,6 @@ public class FileIndex implements Serializable {
// Utilities
// -------------------------------------------------------------------------
- /**
- * Computes pruned list of candidate base-files' names based on provided
list of data filters.
- * conditions, by leveraging Metadata Table's Column Statistics index
(hereon referred as ColStats for brevity)
- * bearing "min", "max", "num_nulls" statistics for all columns.
- *
- * <p>NOTE: This method has to return complete set of candidate files, since
only provided candidates will
- * ultimately be scanned as part of query execution. Hence, this method has
to maintain the
- * invariant of conservatively including every base-file's name, that is NOT
referenced in its index.
- *
- * <p>The {@code filters} must all be simple.
- *
- * @return set of pruned (data-skipped) candidate base-files' names
- */
- @Nullable
- private Set<String> candidateFilesInMetadataTable(List<StoragePathInfo>
allFileStatus) {
- if (dataPruner == null) {
- return null;
- }
- try {
- String[] referencedCols = dataPruner.getReferencedCols();
- final List<RowData> colStats =
- ColumnStatsIndices.readColumnStatsIndex(path.toString(),
metadataConfig, referencedCols);
- final Pair<List<RowData>, String[]> colStatsTable =
- ColumnStatsIndices.transposeColumnStatsIndex(colStats,
referencedCols, rowType);
- List<RowData> transposedColStats = colStatsTable.getLeft();
- String[] queryCols = colStatsTable.getRight();
- if (queryCols.length == 0) {
- // the indexed columns have no intersection with the referenced
columns, returns early
- return null;
- }
- RowType.RowField[] queryFields = DataTypeUtils.projectRowFields(rowType,
queryCols);
-
- Set<String> allIndexedFileNames = transposedColStats.stream().parallel()
- .map(row -> row.getString(0).toString())
- .collect(Collectors.toSet());
- Set<String> candidateFileNames = transposedColStats.stream().parallel()
- .filter(row -> dataPruner.test(row, queryFields))
- .map(row -> row.getString(0).toString())
- .collect(Collectors.toSet());
-
- // NOTE: Col-Stats Index isn't guaranteed to have complete set of
statistics for every
- // base-file: since it's bound to clustering, which could occur
asynchronously
- // at arbitrary point in time, and is not likely to be touching
all the base files.
- //
- // To close that gap, we manually compute the difference b/w all
indexed (by col-stats-index)
- // files and all outstanding base-files, and make sure that all
base files not
- // represented w/in the index are included in the output of this
method
- Set<String> nonIndexedFileNames = allFileStatus.stream()
- .map(fileStatus ->
fileStatus.getPath().getName()).collect(Collectors.toSet());
- nonIndexedFileNames.removeAll(allIndexedFileNames);
-
- candidateFileNames.addAll(nonIndexedFileNames);
- return candidateFileNames;
- } catch (Throwable throwable) {
- LOG.warn("Read column stats for data skipping error", throwable);
- return null;
- }
- }
-
/**
* Returns all the relative partition paths.
*
@@ -296,15 +241,6 @@ public class FileIndex implements Serializable {
return this.partitionPaths;
}
- public static HoodieMetadataConfig
metadataConfig(org.apache.flink.configuration.Configuration conf) {
- Properties properties = new Properties();
-
- // set up metadata.enabled=true in table DDL to enable metadata listing
- properties.put(HoodieMetadataConfig.ENABLE.key(),
conf.getBoolean(FlinkOptions.METADATA_ENABLED));
-
- return
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
- }
-
private boolean isDataSkippingFeasible(boolean dataSkippingEnabled) {
// NOTE: Data Skipping is only effective when it references columns that
are indexed w/in
// the Column Stats Index (CSI). Following cases could not be
effectively handled by Data Skipping:
@@ -348,7 +284,7 @@ public class FileIndex implements Serializable {
private StoragePath path;
private Configuration conf;
private RowType rowType;
- private DataPruner dataPruner;
+ private ColumnStatsProbe columnStatsProbe;
private PartitionPruners.PartitionPruner partitionPruner;
private int dataBucket = PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
@@ -370,8 +306,8 @@ public class FileIndex implements Serializable {
return this;
}
- public Builder dataPruner(DataPruner dataPruner) {
- this.dataPruner = dataPruner;
+ public Builder columnStatsProbe(ColumnStatsProbe columnStatsProbe) {
+ this.columnStatsProbe = columnStatsProbe;
return this;
}
@@ -387,7 +323,7 @@ public class FileIndex implements Serializable {
public FileIndex build() {
return new FileIndex(Objects.requireNonNull(path),
Objects.requireNonNull(conf), Objects.requireNonNull(rowType),
- dataPruner, partitionPruner, dataBucket);
+ columnStatsProbe, partitionPruner, dataBucket);
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/DataPruner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
similarity index 93%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/DataPruner.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
index 25257c83f64..a4b48194002 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/DataPruner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/ColumnStatsProbe.java
@@ -39,15 +39,15 @@ import java.util.Map;
import static org.apache.hudi.source.ExpressionEvaluators.fromExpression;
/**
- * Utility to do data skipping.
+ * Utility for filtering the column stats metadata payloads.
*/
-public class DataPruner implements Serializable {
+public class ColumnStatsProbe implements Serializable {
private static final long serialVersionUID = 1L;
private final String[] referencedCols;
private final List<ExpressionEvaluators.Evaluator> evaluators;
- private DataPruner(String[] referencedCols,
List<ExpressionEvaluators.Evaluator> evaluators) {
+ private ColumnStatsProbe(String[] referencedCols,
List<ExpressionEvaluators.Evaluator> evaluators) {
this.referencedCols = referencedCols;
this.evaluators = evaluators;
}
@@ -74,7 +74,7 @@ public class DataPruner implements Serializable {
}
@Nullable
- public static DataPruner newInstance(List<ResolvedExpression> filters) {
+ public static ColumnStatsProbe newInstance(List<ResolvedExpression> filters)
{
if (filters.isEmpty()) {
return null;
}
@@ -83,7 +83,7 @@ public class DataPruner implements Serializable {
return null;
}
List<ExpressionEvaluators.Evaluator> evaluators = fromExpression(filters);
- return new DataPruner(referencedCols, evaluators);
+ return new ColumnStatsProbe(referencedCols, evaluators);
}
public static Map<String, ColumnStats> convertColumnStats(RowData indexRow,
RowType.RowField[] queryFields) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
index 3f6338896d6..955f471df31 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java
@@ -18,23 +18,31 @@
package org.apache.hudi.source.prune;
-import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.ExpressionEvaluators;
+import org.apache.hudi.source.ExpressionEvaluators.Evaluator;
import org.apache.hudi.source.stats.ColumnStats;
+import org.apache.hudi.source.stats.PartitionStatsIndex;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Tools to prune partitions.
@@ -125,21 +133,154 @@ public class PartitionPruners {
}
}
- public static PartitionPruner getInstance(
- List<ExpressionEvaluators.Evaluator> partitionEvaluators,
- List<String> partitionKeys,
- List<DataType> partitionTypes,
- String defaultParName,
- boolean hivePartition) {
- ValidationUtils.checkState(!partitionEvaluators.isEmpty());
- return new DynamicPartitionPruner(partitionEvaluators, partitionKeys,
partitionTypes, defaultParName, hivePartition);
+ /**
+ * ColumnStats partition pruner for hoodie table source which enables
partition stats index.
+ *
+ * <p>Note: The data of new partitions created after the job starts could be
read if they match the
+ * filter conditions.
+ */
+ public static class ColumnStatsPartitionPruner implements PartitionPruner {
+ private static final long serialVersionUID = 1L;
+ private final ColumnStatsProbe probe;
+ private final PartitionStatsIndex partitionStatsIndex;
+
+ public ColumnStatsPartitionPruner(
+ RowType rowType,
+ String basePath,
+ HoodieMetadataConfig metadataConfig,
+ ColumnStatsProbe probe) {
+ this.probe = probe;
+ this.partitionStatsIndex = new PartitionStatsIndex(basePath, rowType,
metadataConfig);
+ }
+
+ @Override
+ public Set<String> filter(Collection<String> partitions) {
+ Set<String> candidatePartitions =
partitionStatsIndex.computeCandidatePartitions(probe, new
ArrayList<>(partitions));
+ if (candidatePartitions == null) {
+ return new HashSet<>(partitions);
+ }
+ return
partitions.stream().filter(candidatePartitions::contains).collect(Collectors.toSet());
+ }
+ }
+
+ /**
+ * Chained partition pruner for hoodie table source combining multiple
partition pruners with predicate '&'.
+ */
+ public static class ChainedPartitionPruner implements PartitionPruner {
+ private static final long serialVersionUID = 1L;
+ private final List<PartitionPruner> pruners;
+
+ public ChainedPartitionPruner(List<PartitionPruner> pruners) {
+ this.pruners = pruners;
+ }
+
+ @Override
+ public Set<String> filter(Collection<String> partitions) {
+ for (PartitionPruner pruner: pruners) {
+ partitions = pruner.filter(partitions);
+ }
+ return new HashSet<>(partitions);
+ }
}
- public static PartitionPruner getInstance(Collection<String>
candidatePartitions) {
- return new StaticPartitionPruner(candidatePartitions);
+ public static Builder builder() {
+ return new Builder();
}
- public static PartitionPruner getInstance(String... candidatePartitions) {
- return new StaticPartitionPruner(Arrays.asList(candidatePartitions));
+ public static class Builder {
+ private RowType rowType;
+ private String basePath;
+ private Configuration conf;
+ private ColumnStatsProbe probe;
+ private List<ExpressionEvaluators.Evaluator> partitionEvaluators;
+ private List<String> partitionKeys;
+ private List<DataType> partitionTypes;
+ private String defaultParName;
+ private boolean hivePartition;
+ private Collection<String> candidatePartitions;
+
+ private Builder() {
+ }
+
+ public Builder rowType(RowType rowType) {
+ this.rowType = rowType;
+ return this;
+ }
+
+ public Builder basePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ public Builder conf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder columnStatsProbe(ColumnStatsProbe probe) {
+ this.probe = probe;
+ return this;
+ }
+
+ public Builder partitionEvaluators(List<Evaluator> partitionEvaluators) {
+ this.partitionEvaluators = partitionEvaluators;
+ return this;
+ }
+
+ public Builder partitionKeys(List<String> partitionKeys) {
+ this.partitionKeys = partitionKeys;
+ return this;
+ }
+
+ public Builder partitionTypes(List<DataType> partitionTypes) {
+ this.partitionTypes = partitionTypes;
+ return this;
+ }
+
+ public Builder defaultParName(String defaultParName) {
+ this.defaultParName = defaultParName;
+ return this;
+ }
+
+ public Builder hivePartition(boolean hivePartition) {
+ this.hivePartition = hivePartition;
+ return this;
+ }
+
+ public Builder candidatePartitions(Collection<String> candidatePartitions)
{
+ this.candidatePartitions = candidatePartitions;
+ return this;
+ }
+
+ public PartitionPruner build() {
+ PartitionPruner staticPruner = null;
+ if (candidatePartitions != null && !candidatePartitions.isEmpty()) {
+ staticPruner = new StaticPartitionPruner(candidatePartitions);
+ }
+ PartitionPruner dynamicPruner = null;
+ if (partitionEvaluators != null && !partitionEvaluators.isEmpty()) {
+ dynamicPruner = new DynamicPartitionPruner(partitionEvaluators,
Objects.requireNonNull(partitionKeys),
+ Objects.requireNonNull(partitionTypes),
Objects.requireNonNull(defaultParName),
+ hivePartition);
+ }
+ PartitionPruner columnStatsPruner = null;
+ if (probe != null
+ && conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)
+ && conf.get(FlinkOptions.METADATA_ENABLED)) {
+ columnStatsPruner = new
ColumnStatsPartitionPruner(Objects.requireNonNull(rowType),
Objects.requireNonNull(basePath),
+ StreamerUtil.metadataConfig(Objects.requireNonNull(conf)), probe);
+ }
+ List<PartitionPruner> partitionPruners =
+ Stream.of(staticPruner, dynamicPruner, columnStatsPruner)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ if (partitionPruners.isEmpty()) {
+ return null;
+ }
+ if (partitionPruners.size() < 2) {
+ return partitionPruners.get(0);
+ }
+ return new ChainedPartitionPruner(partitionPruners);
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
new file mode 100644
index 00000000000..1d73e167e33
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndex.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.source.prune.ColumnStatsProbe;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Base support that leverages Metadata Table's indexes, such as Column Stats
Index
+ * and Partition Stats Index, to prune files and partitions.
+ */
+public interface ColumnStatsIndex extends Serializable {
+
+ /**
+ * Returns the partition name of the index.
+ */
+ String getIndexPartitionName();
+
+ /**
+ * Computes the filtered files with given candidates.
+ *
+ * @param columnStatsProbe The utility to filter the column stats metadata.
+ * @param allFile The file name list of the candidate files.
+ *
+ * @return The set of filtered file names
+ */
+ Set<String> computeCandidateFiles(ColumnStatsProbe columnStatsProbe,
List<String> allFile);
+
+ /**
+ * Computes the filtered partition paths with given candidates.
+ *
+ * @param columnStatsProbe The utility to filter the column stats metadata.
+ * @param allPartitions The <strong>relative</strong> partition path list
of the candidate partitions.
+ *
+ * @return The set of filtered relative partition paths
+ */
+ Set<String> computeCandidatePartitions(ColumnStatsProbe columnStatsProbe,
List<String> allPartitions);
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
new file mode 100644
index 00000000000..1188c241990
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+import org.apache.hudi.util.AvroSchemaConverter;
+
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.stream.Stream;
+
+/**
+ * Utility class for column stats schema.
+ */
+public class ColumnStatsSchemas {
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+ public static final DataType METADATA_DATA_TYPE = getMetadataDataType();
+ public static final DataType COL_STATS_DATA_TYPE = getColStatsDataType();
+ public static final int[] COL_STATS_TARGET_POS = getColStatsTargetPos();
+
+ // the column schema:
+ // |- file_name: string
+ // |- min_val: row
+ // |- max_val: row
+ // |- null_cnt: long
+ // |- val_cnt: long
+ // |- column_name: string
+ public static final int ORD_FILE_NAME = 0;
+ public static final int ORD_MIN_VAL = 1;
+ public static final int ORD_MAX_VAL = 2;
+ public static final int ORD_NULL_CNT = 3;
+ public static final int ORD_VAL_CNT = 4;
+ public static final int ORD_COL_NAME = 5;
+
+ private static DataType getMetadataDataType() {
+ return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$);
+ }
+
+ private static DataType getColStatsDataType() {
+ int pos =
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
+ return METADATA_DATA_TYPE.getChildren().get(pos);
+ }
+
+ // the column schema:
+ // |- file_name: string
+ // |- min_val: row
+ // |- max_val: row
+ // |- null_cnt: long
+ // |- val_cnt: long
+ // |- column_name: string
+ private static int[] getColStatsTargetPos() {
+ RowType colStatsRowType = (RowType) COL_STATS_DATA_TYPE.getLogicalType();
+ return Stream.of(
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
+ HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
+ .mapToInt(colStatsRowType::getFieldIndex)
+ .toArray();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
similarity index 67%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
index 674e58dce6b..0b3ffeff7d5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/FileStatsIndex.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Tuple3;
import org.apache.hudi.common.util.hash.ColumnIndexID;
@@ -31,9 +32,10 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.AvroToRowDataConverters;
+import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.FlinkClientUtil;
import org.apache.hudi.util.RowDataProjection;
@@ -41,9 +43,12 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
@@ -55,44 +60,130 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_DATA_TYPE;
+import static
org.apache.hudi.source.stats.ColumnStatsSchemas.COL_STATS_TARGET_POS;
+import static
org.apache.hudi.source.stats.ColumnStatsSchemas.METADATA_DATA_TYPE;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_COL_NAME;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_FILE_NAME;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_MAX_VAL;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_MIN_VAL;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_NULL_CNT;
+import static org.apache.hudi.source.stats.ColumnStatsSchemas.ORD_VAL_CNT;
/**
- * Utilities for abstracting away heavy-lifting of interactions with Metadata
Table's Column Stats Index,
+ * An index support implementation that leverages Column Stats Index to prune
files,
+ * including utilities for abstracting away heavy-lifting of interactions with
the index,
* providing convenient interfaces to read it, transpose, etc.
*/
-public class ColumnStatsIndices {
- private static final DataType METADATA_DATA_TYPE = getMetadataDataType();
- private static final DataType COL_STATS_DATA_TYPE = getColStatsDataType();
- private static final int[] COL_STATS_TARGET_POS = getColStatsTargetPos();
-
- // the column schema:
- // |- file_name: string
- // |- min_val: row
- // |- max_val: row
- // |- null_cnt: long
- // |- val_cnt: long
- // |- column_name: string
- private static final int ORD_FILE_NAME = 0;
- private static final int ORD_MIN_VAL = 1;
- private static final int ORD_MAX_VAL = 2;
- private static final int ORD_NULL_CNT = 3;
- private static final int ORD_VAL_CNT = 4;
- private static final int ORD_COL_NAME = 5;
-
- private ColumnStatsIndices() {
+public class FileStatsIndex implements ColumnStatsIndex {
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(FileStatsIndex.class);
+ private final RowType rowType;
+ private final String basePath;
+ private final HoodieMetadataConfig metadataConfig;
+ private HoodieTableMetadata metadataTable;
+
+ public FileStatsIndex(
+ String basePath,
+ RowType rowType,
+ HoodieMetadataConfig metadataConfig) {
+ this.basePath = basePath;
+ this.rowType = rowType;
+ this.metadataConfig = metadataConfig;
}
- public static List<RowData> readColumnStatsIndex(String basePath,
HoodieMetadataConfig metadataConfig, String[] targetColumns) {
- // NOTE: If specific columns have been provided, we can considerably trim
down amount of data fetched
- // by only fetching Column Stats Index records pertaining to the
requested columns.
- // Otherwise, we fall back to read whole Column Stats Index
- ValidationUtils.checkArgument(targetColumns.length > 0,
- "Column stats is only valid when push down filters have referenced
columns");
- final List<RowData> metadataRows = readColumnStatsIndexByColumns(basePath,
targetColumns, metadataConfig);
- return projectNestedColStatsColumns(metadataRows);
+ @Override
+ public String getIndexPartitionName() {
+ return HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
+ }
+
+ public HoodieTableMetadata getMetadataTable() {
+ // initialize the metadata table lazily
+ if (this.metadataTable == null) {
+ this.metadataTable = HoodieTableMetadata.create(
+ HoodieFlinkEngineContext.DEFAULT,
+ new HoodieHadoopStorage(basePath, FlinkClientUtil.getHadoopConf()),
+ metadataConfig,
+ basePath);
+ }
+ return this.metadataTable;
+ }
+
+ @Override
+ public Set<String> computeCandidateFiles(ColumnStatsProbe probe,
List<String> allFiles) {
+ if (probe == null) {
+ return null;
+ }
+ try {
+ String[] targetColumns = probe.getReferencedCols();
+ final List<RowData> statsRows =
readColumnStatsIndexByColumns(targetColumns);
+ return candidatesInMetadataTable(probe, statsRows, allFiles);
+ } catch (Throwable t) {
+ LOG.warn("Read {} for data skipping error", getIndexPartitionName(), t);
+ return null;
+ }
+ }
+
+ @Override
+ public Set<String> computeCandidatePartitions(ColumnStatsProbe probe,
List<String> allPartitions) {
+ throw new UnsupportedOperationException("This method is not supported by "
+ this.getClass().getSimpleName());
+ }
+
+ /**
+ * Computes pruned list of candidates' names based on provided list of data
filters.
+ * conditions, by leveraging Metadata Table's Column Statistics index
(hereon referred as ColStats for brevity)
+ * bearing "min", "max", "num_nulls" statistics for all columns.
+ *
+ * <p>NOTE: This method has to return complete set of the candidates, since
only provided candidates will
+ * ultimately be scanned as part of query execution. Hence, this method has
to maintain the
+ * invariant of conservatively including every candidate's name, that is NOT
referenced in its index.
+ *
+ * <p>The {@code filters} must all be simple.
+ *
+ * @param probe The column stats probe built from push-down filters.
+ * @param indexRows The raw column stats records.
+ * @param oriCandidates The original candidates to be pruned.
+ *
+ * @return set of pruned (data-skipped) candidate names
+ */
+ protected Set<String> candidatesInMetadataTable(
+ @Nullable ColumnStatsProbe probe,
+ List<RowData> indexRows,
+ List<String> oriCandidates) {
+ if (probe == null) {
+ return null;
+ }
+ String[] referencedCols = probe.getReferencedCols();
+ final Pair<List<RowData>, String[]> colStatsTable =
+ transposeColumnStatsIndex(indexRows, referencedCols);
+ List<RowData> transposedColStats = colStatsTable.getLeft();
+ String[] queryCols = colStatsTable.getRight();
+ if (queryCols.length == 0) {
+ // the indexed columns have no intersection with the referenced columns,
returns early
+ return null;
+ }
+ RowType.RowField[] queryFields = DataTypeUtils.projectRowFields(rowType,
queryCols);
+
+ Set<String> allIndexedFiles = transposedColStats.stream().parallel()
+ .map(row -> row.getString(0).toString())
+ .collect(Collectors.toSet());
+ Set<String> candidateFiles = transposedColStats.stream().parallel()
+ .filter(row -> probe.test(row, queryFields))
+ .map(row -> row.getString(0).toString())
+ .collect(Collectors.toSet());
+
+ // NOTE: Col-Stats Index isn't guaranteed to have complete set of
statistics for every
+ // base-file: since it's bound to clustering, which could occur
asynchronously
+ // at arbitrary point in time, and is not likely to be touching all
the base files.
+ //
+ // To close that gap, we manually compute the difference b/w all
indexed (by col-stats-index)
+ // files and all outstanding base-files, and make sure that all base
files not
+ // represented w/in the index are included in the output of this
method
+ oriCandidates.removeAll(allIndexedFiles);
+ candidateFiles.addAll(oriCandidates);
+ return candidateFiles;
}
private static List<RowData> projectNestedColStatsColumns(List<RowData>
rows) {
@@ -140,12 +231,12 @@ public class ColumnStatsIndices {
*
* @param colStats RowData list bearing raw Column Stats Index table
* @param queryColumns target columns to be included into the final table
- * @param tableSchema schema of the source data table
* @return reshaped table according to the format outlined above
*/
- public static Pair<List<RowData>, String[]>
transposeColumnStatsIndex(List<RowData> colStats, String[] queryColumns,
RowType tableSchema) {
+ @VisibleForTesting
+ public Pair<List<RowData>, String[]> transposeColumnStatsIndex(List<RowData>
colStats, String[] queryColumns) {
- Map<String, LogicalType> tableFieldTypeMap =
tableSchema.getFields().stream()
+ Map<String, LogicalType> tableFieldTypeMap = rowType.getFields().stream()
.collect(Collectors.toMap(RowType.RowField::getName,
RowType.RowField::getType));
// NOTE: We have to collect list of indexed columns to make sure we
properly align the rows
@@ -243,7 +334,6 @@ public class ColumnStatsIndices {
// |- null_cnt: long
// |- val_cnt: long
// |- column_name: string
-
GenericRowData unpackedRow = new GenericRowData(row.getArity());
unpackedRow.setField(0, row.getString(0));
unpackedRow.setField(1, minValue);
@@ -278,28 +368,27 @@ public class ColumnStatsIndices {
return converter.convert(rawVal);
}
- private static List<RowData> readColumnStatsIndexByColumns(
- String basePath,
- String[] targetColumns,
- HoodieMetadataConfig metadataConfig) {
+ @VisibleForTesting
+ public List<RowData> readColumnStatsIndexByColumns(String[] targetColumns) {
+ // NOTE: If specific columns have been provided, we can considerably trim
down amount of data fetched
+ // by only fetching Column Stats Index records pertaining to the
requested columns.
+ // Otherwise, we fall back to read whole Column Stats Index
+ ValidationUtils.checkArgument(targetColumns.length > 0,
+ "Column stats is only valid when push down filters have referenced
columns");
- // Read Metadata Table's Column Stats Index into Flink's RowData list by
- // - Fetching the records from CSI by key-prefixes (encoded column
names)
+ // Read Metadata Table's column stats Flink's RowData list by
+ // - Fetching the records by key-prefixes (encoded column names)
// - Deserializing fetched records into [[RowData]]s
- HoodieTableMetadata metadataTable = HoodieTableMetadata.create(
- HoodieFlinkEngineContext.DEFAULT, new HoodieHadoopStorage(basePath,
FlinkClientUtil.getHadoopConf()),
- metadataConfig, basePath);
-
// TODO encoding should be done internally w/in HoodieBackedTableMetadata
List<String> encodedTargetColumnNames = Arrays.stream(targetColumns)
.map(colName -> new
ColumnIndexID(colName).asBase64EncodedString()).collect(Collectors.toList());
HoodieData<HoodieRecord<HoodieMetadataPayload>> records =
- metadataTable.getRecordsByKeyPrefixes(encodedTargetColumnNames,
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, false);
+ getMetadataTable().getRecordsByKeyPrefixes(encodedTargetColumnNames,
getIndexPartitionName(), false);
org.apache.hudi.util.AvroToRowDataConverters.AvroToRowDataConverter
converter =
AvroToRowDataConverters.createRowConverter((RowType)
METADATA_DATA_TYPE.getLogicalType());
- return records.collectAsList().stream().parallel().map(record -> {
+ List<RowData> rows =
records.collectAsList().stream().parallel().map(record -> {
// schema and props are ignored for generating metadata record from
the payload
// instead, the underlying file system, or bloom filter, or columns
stats metadata (part of payload) are directly used
GenericRecord genericRecord;
@@ -311,37 +400,6 @@ public class ColumnStatsIndices {
return (RowData) converter.convert(genericRecord);
}
).collect(Collectors.toList());
- }
-
- // -------------------------------------------------------------------------
- // Utilities
- // -------------------------------------------------------------------------
- private static DataType getMetadataDataType() {
- return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$);
- }
-
- private static DataType getColStatsDataType() {
- int pos =
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
- return METADATA_DATA_TYPE.getChildren().get(pos);
- }
-
- // the column schema:
- // |- file_name: string
- // |- min_val: row
- // |- max_val: row
- // |- null_cnt: long
- // |- val_cnt: long
- // |- column_name: string
- private static int[] getColStatsTargetPos() {
- RowType colStatsRowType = (RowType) COL_STATS_DATA_TYPE.getLogicalType();
- return Stream.of(
- HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT,
- HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME)
- .mapToInt(colStatsRowType::getFieldIndex)
- .toArray();
+ return projectNestedColStatsColumns(rows);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
new file mode 100644
index 00000000000..e2f6431e77d
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/PartitionStatsIndex.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source.stats;
+
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
+
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An index support implementation that leverages Partition Stats Index to
prune partitions.
+ */
+public class PartitionStatsIndex extends FileStatsIndex {
+ private static final long serialVersionUID = 1L;
+
+ public PartitionStatsIndex(
+ String basePath,
+ RowType tableRowType,
+ HoodieMetadataConfig metadataConfig) {
+ super(basePath, tableRowType, metadataConfig);
+ }
+
+ @Override
+ public String getIndexPartitionName() {
+ return HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS;
+ }
+
+ @Override
+ public Set<String> computeCandidateFiles(ColumnStatsProbe probe,
List<String> allFiles) {
+ throw new UnsupportedOperationException("This method is not supported by "
+ this.getClass().getSimpleName());
+ }
+
+ /**
+ * NOTE: The stats payload stored in Metadata table for Partition Stats Index
+ * is {@link HoodieMetadataColumnStats}}, with schema:
+ *
+ * <pre>
+ * |- partition_name: string
+ * |- min_val: row
+ * |- max_val: row
+ * |- null_cnt: long
+ * |- val_cnt: long
+ * |- column_name: string
+ * </pre>
+ * Thus, the loading/transposing and candidates computing logic can be
reused.
+ *
+ * @param probe Column stats probe constructed from pushed down
column filters.
+ * @param allPartitions All partitions to be pruned by partition stats.
+ *
+ * @return the candidate partitions pruned by partition stats.
+ */
+ @Override
+ public Set<String> computeCandidatePartitions(ColumnStatsProbe probe,
List<String> allPartitions) {
+ return super.computeCandidateFiles(probe, allPartitions);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index d1c7d7292d8..18710678bf5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -46,7 +46,7 @@ import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
-import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
import
org.apache.hudi.source.rebalance.partitioner.StreamReadAppendPartitioner;
@@ -154,7 +154,7 @@ public class HoodieTableSource implements
private int[] requiredPos;
private long limit;
private List<Predicate> predicates;
- private DataPruner dataPruner;
+ private ColumnStatsProbe columnStatsProbe;
private PartitionPruners.PartitionPruner partitionPruner;
private int dataBucket;
private transient FileIndex fileIndex;
@@ -175,7 +175,7 @@ public class HoodieTableSource implements
String defaultPartName,
Configuration conf,
@Nullable List<Predicate> predicates,
- @Nullable DataPruner dataPruner,
+ @Nullable ColumnStatsProbe columnStatsProbe,
@Nullable PartitionPruners.PartitionPruner partitionPruner,
int dataBucket,
@Nullable int[] requiredPos,
@@ -189,7 +189,7 @@ public class HoodieTableSource implements
this.defaultPartName = defaultPartName;
this.conf = conf;
this.predicates =
Optional.ofNullable(predicates).orElse(Collections.emptyList());
- this.dataPruner = dataPruner;
+ this.columnStatsProbe = columnStatsProbe;
this.partitionPruner = partitionPruner;
this.dataBucket = dataBucket;
this.requiredPos = Optional.ofNullable(requiredPos).orElseGet(() ->
IntStream.range(0, this.tableRowType.getFieldCount()).toArray());
@@ -266,7 +266,7 @@ public class HoodieTableSource implements
@Override
public DynamicTableSource copy() {
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
- conf, predicates, dataPruner, partitionPruner, dataBucket,
requiredPos, limit, metaClient, internalSchemaManager);
+ conf, predicates, columnStatsProbe, partitionPruner, dataBucket,
requiredPos, limit, metaClient, internalSchemaManager);
}
@Override
@@ -279,8 +279,8 @@ public class HoodieTableSource implements
List<ResolvedExpression> simpleFilters =
filterSimpleCallExpression(filters);
Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters =
splitExprByPartitionCall(simpleFilters, this.partitionKeys, this.tableRowType);
this.predicates = ExpressionPredicates.fromExpression(splitFilters.f0);
- this.dataPruner = DataPruner.newInstance(splitFilters.f0);
- this.partitionPruner = cratePartitionPruner(splitFilters.f1);
+ this.columnStatsProbe = ColumnStatsProbe.newInstance(splitFilters.f0);
+ this.partitionPruner = createPartitionPruner(splitFilters.f1,
columnStatsProbe);
this.dataBucket = getDataBucket(splitFilters.f0);
// refuse all the filters now
return SupportsFilterPushDown.Result.of(new ArrayList<>(splitFilters.f1),
new ArrayList<>(filters));
@@ -341,8 +341,8 @@ public class HoodieTableSource implements
}
@Nullable
- private PartitionPruners.PartitionPruner
cratePartitionPruner(List<ResolvedExpression> partitionFilters) {
- if (partitionFilters.isEmpty()) {
+ private PartitionPruners.PartitionPruner
createPartitionPruner(List<ResolvedExpression> partitionFilters,
ColumnStatsProbe columnStatsProbe) {
+ if (!isPartitioned() || partitionFilters.isEmpty() && columnStatsProbe ==
null) {
return null;
}
StringJoiner joiner = new StringJoiner(" and ");
@@ -353,9 +353,20 @@ public class HoodieTableSource implements
this.schema.getColumn(name).orElseThrow(() -> new
HoodieValidationException("Field " + name + " does not exist")))
.map(SerializableSchema.Column::getDataType)
.collect(Collectors.toList());
- String defaultParName =
conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
- boolean hivePartition =
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING);
- return PartitionPruners.getInstance(evaluators, this.partitionKeys,
partitionTypes, defaultParName, hivePartition);
+ String defaultParName = conf.get(FlinkOptions.PARTITION_DEFAULT_NAME);
+ boolean hivePartition = conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING);
+
+ return PartitionPruners.builder()
+ .basePath(path.toString())
+ .rowType(tableRowType)
+ .conf(conf)
+ .columnStatsProbe(columnStatsProbe)
+ .partitionEvaluators(evaluators)
+ .partitionKeys(partitionKeys)
+ .partitionTypes(partitionTypes)
+ .defaultParName(defaultParName)
+ .hivePartition(hivePartition)
+ .build();
}
private int getDataBucket(List<ResolvedExpression> dataFilters) {
@@ -602,7 +613,7 @@ public class HoodieTableSource implements
.path(this.path)
.conf(this.conf)
.rowType(this.tableRowType)
- .dataPruner(this.dataPruner)
+ .columnStatsProbe(this.columnStatsProbe)
.partitionPruner(this.partitionPruner)
.dataBucket(this.dataBucket)
.build();
@@ -624,6 +635,10 @@ public class HoodieTableSource implements
return keyIndices;
}
+ private boolean isPartitioned() {
+ return !this.partitionKeys.isEmpty() &&
this.partitionKeys.stream().noneMatch(String::isEmpty);
+ }
+
@VisibleForTesting
public Schema getTableAvroSchema() {
try {
@@ -660,22 +675,27 @@ public class HoodieTableSource implements
*/
@VisibleForTesting
public List<StoragePathInfo> getReadFiles() {
- FileIndex fileIndex = getOrBuildFileIndex();
- List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
+ List<String> relPartitionPaths = getReadPartitions();
if (relPartitionPaths.isEmpty()) {
return Collections.emptyList();
}
return fileIndex.getFilesInPartitions();
}
+ @VisibleForTesting
+ public List<String> getReadPartitions() {
+ FileIndex fileIndex = getOrBuildFileIndex();
+ return fileIndex.getOrBuildPartitionPaths();
+ }
+
@VisibleForTesting
public List<Predicate> getPredicates() {
return predicates;
}
@VisibleForTesting
- public DataPruner getDataPruner() {
- return dataPruner;
+ public ColumnStatsProbe getColumnStatsProbe() {
+ return columnStatsProbe;
}
@VisibleForTesting
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 46ecf7ef8d2..eadf42ca051 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -20,6 +20,7 @@ package org.apache.hudi.util;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
@@ -551,4 +552,16 @@ public class StreamerUtil {
FlinkOptions.KEYGEN_CLASS_NAME.key(),
ComplexAvroKeyGenerator.class.getName());
}
}
+
+ /**
+ * @return HoodieMetadataConfig constructed from flink configuration.
+ */
+ public static HoodieMetadataConfig
metadataConfig(org.apache.flink.configuration.Configuration conf) {
+ Properties properties = new Properties();
+
+ // set up metadata.enabled=true in table DDL to enable metadata listing
+ properties.put(HoodieMetadataConfig.ENABLE.key(),
conf.getBoolean(FlinkOptions.METADATA_ENABLED));
+
+ return
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
index e3dc0a836c9..5a8d4ea00d9 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestExpressionEvaluators.java
@@ -44,7 +44,7 @@ import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.source.ExpressionEvaluators.fromExpression;
-import static org.apache.hudi.source.prune.DataPruner.convertColumnStats;
+import static org.apache.hudi.source.prune.ColumnStatsProbe.convertColumnStats;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index 20a5cb34fdf..a2d5e10ec4a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -20,9 +20,11 @@ package org.apache.hudi.source;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
-import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
+import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.utils.TestConfigurations;
@@ -41,6 +43,7 @@ import org.apache.flink.table.functions.FunctionIdentifier;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
@@ -56,9 +59,12 @@ import static
org.apache.hudi.configuration.FlinkOptions.KEYGEN_CLASS_NAME;
import static org.apache.hudi.configuration.FlinkOptions.METADATA_ENABLED;
import static
org.apache.hudi.configuration.FlinkOptions.PARTITION_DEFAULT_NAME;
import static org.apache.hudi.configuration.FlinkOptions.PARTITION_PATH_FIELD;
+import static
org.apache.hudi.configuration.FlinkOptions.READ_DATA_SKIPPING_ENABLED;
+import static org.apache.hudi.configuration.FlinkOptions.TABLE_TYPE;
import static org.apache.hudi.utils.TestData.insertRow;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -133,7 +139,7 @@ public class TestFileIndex {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath(),
TestConfigurations.ROW_DATA_TYPE_BIGINT);
conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
- conf.setBoolean(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+ conf.setBoolean(READ_DATA_SKIPPING_ENABLED, true);
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
true);
writeBigintDataset(conf);
@@ -142,7 +148,7 @@ public class TestFileIndex {
FileIndex.builder()
.path(new StoragePath(tempFile.getAbsolutePath()))
.conf(conf).rowType(TestConfigurations.ROW_TYPE_BIGINT)
- .dataPruner(DataPruner.newInstance(Collections.singletonList(new
CallExpression(
+
.columnStatsProbe(ColumnStatsProbe.newInstance(Collections.singletonList(new
CallExpression(
FunctionIdentifier.of("greaterThan"),
BuiltInFunctionDefinitions.GREATER_THAN,
Arrays.asList(
@@ -157,6 +163,54 @@ public class TestFileIndex {
assertThat(files.size(), is(2));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testFileListingWithPartitionStatsPruning(HoodieTableType tableType)
throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(READ_DATA_SKIPPING_ENABLED, true);
+ conf.set(METADATA_ENABLED, true);
+ conf.set(TABLE_TYPE, tableType.name());
+
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
true);
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ // enable CSI for MOR table to collect col stats for delta write stats,
+ // which will be used to construct partition stats then.
+
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
true);
+ }
+
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ // uuid > 'id5' and age < 30, only column stats of 'par3' matches the
filter.
+ ColumnStatsProbe columnStatsProbe =
+ ColumnStatsProbe.newInstance(Arrays.asList(
+ new CallExpression(
+ FunctionIdentifier.of("greaterThan"),
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ Arrays.asList(
+ new FieldReferenceExpression("uuid", DataTypes.STRING(),
0, 0),
+ new ValueLiteralExpression("id5",
DataTypes.STRING().notNull())
+ ),
+ DataTypes.BOOLEAN()),
+ new CallExpression(
+ FunctionIdentifier.of("lessThan"),
+ BuiltInFunctionDefinitions.LESS_THAN,
+ Arrays.asList(
+ new FieldReferenceExpression("age", DataTypes.INT(), 2, 2),
+ new ValueLiteralExpression(30, DataTypes.INT().notNull())
+ ),
+ DataTypes.BOOLEAN())));
+
+ FileIndex fileIndex =
+ FileIndex.builder()
+ .path(new StoragePath(tempFile.getAbsolutePath()))
+ .conf(conf)
+ .rowType(TestConfigurations.ROW_TYPE)
+
.partitionPruner(PartitionPruners.builder().rowType(TestConfigurations.ROW_TYPE).basePath(tempFile.getAbsolutePath()).conf(conf).columnStatsProbe(columnStatsProbe).build())
+ .build();
+
+ List<String> p = fileIndex.getOrBuildPartitionPaths();
+ assertEquals(Arrays.asList("par3"), p);
+ }
+
private void writeBigintDataset(Configuration conf) throws Exception {
List<RowData> dataset = Arrays.asList(
insertRow(TestConfigurations.ROW_TYPE_BIGINT, 1L,
StringData.fromString("Danny"), 23,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
index db645f3936e..20fe5b1fe95 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java
@@ -18,6 +18,7 @@
package org.apache.hudi.source;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
@@ -35,6 +36,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.utils.TestConfigurations;
@@ -44,12 +46,16 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionIdentifier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
@@ -331,12 +337,14 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
testData.addAll(TestData.DATA_SET_INSERT.stream().collect(Collectors.toList()));
testData.addAll(TestData.DATA_SET_INSERT_PARTITION_IS_NULL.stream().collect(Collectors.toList()));
TestData.writeData(testData, conf);
- PartitionPruners.PartitionPruner partitionPruner =
PartitionPruners.getInstance(
- Collections.singletonList(partitionEvaluator),
- Collections.singletonList("partition"),
- Collections.singletonList(DataTypes.STRING()),
- PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH,
- false);
+ PartitionPruners.PartitionPruner partitionPruner =
+ PartitionPruners.builder()
+ .partitionEvaluators(Collections.singletonList(partitionEvaluator))
+ .partitionKeys(Collections.singletonList("partition"))
+ .partitionTypes(Collections.singletonList(DataTypes.STRING()))
+ .defaultParName(PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH)
+ .hivePartition(false)
+ .build();
IncrementalInputSplits iis = IncrementalInputSplits.builder()
.conf(conf)
.path(new Path(basePath))
@@ -348,6 +356,55 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
assertEquals(expectedPartitions, partitions);
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testInputSplitsWithPartitionStatsPruner(HoodieTableType tableType)
throws Exception {
+ Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+ conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
+
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
true);
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ // enable CSI for MOR table to collect col stats for delta write stats,
+ // which will be used to construct partition stats then.
+
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
true);
+ }
+ metaClient = HoodieTestUtils.init(basePath, tableType);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ // uuid > 'id5' and age < 30, only column stats of 'par3' matches the
filter.
+ ColumnStatsProbe columnStatsProbe =
+ ColumnStatsProbe.newInstance(Arrays.asList(
+ new CallExpression(
+ FunctionIdentifier.of("greaterThan"),
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ Arrays.asList(
+ new FieldReferenceExpression("uuid", DataTypes.STRING(),
0, 0),
+ new ValueLiteralExpression("id5",
DataTypes.STRING().notNull())
+ ),
+ DataTypes.BOOLEAN()),
+ new CallExpression(
+ FunctionIdentifier.of("lessThan"),
+ BuiltInFunctionDefinitions.LESS_THAN,
+ Arrays.asList(
+ new FieldReferenceExpression("age", DataTypes.INT(), 2, 2),
+ new ValueLiteralExpression(30, DataTypes.INT().notNull())
+ ),
+ DataTypes.BOOLEAN())));
+
+ PartitionPruners.PartitionPruner partitionPruner =
+
PartitionPruners.builder().rowType(TestConfigurations.ROW_TYPE).basePath(basePath).conf(conf).columnStatsProbe(columnStatsProbe).build();
+ IncrementalInputSplits iis = IncrementalInputSplits.builder()
+ .conf(conf)
+ .path(new Path(basePath))
+ .rowType(TestConfigurations.ROW_TYPE)
+ .partitionPruner(partitionPruner)
+ .build();
+ IncrementalInputSplits.Result result = iis.inputSplits(metaClient, null,
false);
+ List<String> partitions = getFilteredPartitions(result);
+ assertEquals(Arrays.asList("par3"), partitions);
+ }
+
@Test
void testInputSplitsWithSpeedLimit() throws Exception {
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE);
@@ -474,11 +531,22 @@ public class TestIncrementalInputSplits extends
HoodieCommonTestHarness {
}
private List<String> getFilteredPartitions(IncrementalInputSplits.Result
result) {
- return result.getInputSplits().stream().map(split -> {
- Option<String> basePath = split.getBasePath();
- String[] pathParts = basePath.get().split("/");
- return pathParts[pathParts.length - 2];
- }).collect(Collectors.toList());
+ List<String> partitions = new ArrayList<>();
+ result.getInputSplits().forEach(split -> {
+ split.getBasePath().map(path -> {
+ String[] pathParts = path.split("/");
+ partitions.add(pathParts[pathParts.length - 2]);
+ return null;
+ });
+ split.getLogPaths().map(paths -> {
+ paths.forEach(path -> {
+ String[] pathParts = path.split("/");
+ partitions.add(pathParts[pathParts.length - 2]);
+ });
+ return null;
+ });
+ });
+ return partitions;
}
private Integer intervalBetween2Instants(HoodieTimeline timeline, String
instant1, String instant2) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestDataPruner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestColumnStatsProbe.java
similarity index 96%
rename from
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestDataPruner.java
rename to
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestColumnStatsProbe.java
index 9ef91fe3c5a..01e302acc5f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestDataPruner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/prune/TestColumnStatsProbe.java
@@ -37,7 +37,7 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;
-class TestDataPruner {
+class TestColumnStatsProbe {
@ParameterizedTest
@MethodSource("testTypes")
@@ -45,7 +45,7 @@ class TestDataPruner {
DataType rowDataType = getDataRowDataType(dataType);
DataType indexRowDataType = getIndexRowDataType(dataType);
- Map<String, ColumnStats> stats1 = DataPruner.convertColumnStats(
+ Map<String, ColumnStats> stats1 = ColumnStatsProbe.convertColumnStats(
getIndexRow(indexRowDataType, minValue, maxValue),
getDataFields(rowDataType)
);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndex.java
similarity index 57%
rename from
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java
rename to
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndex.java
index 837f4192486..15656d0fa0a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndices.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/stats/TestColumnStatsIndex.java
@@ -38,15 +38,56 @@ import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
/**
- * Test cases for {@link ColumnStatsIndices}.
+ * Test cases for {@link ColumnStatsIndex}.
*/
-public class TestColumnStatsIndices {
+public class TestColumnStatsIndex {
@TempDir
File tempFile;
+ @Test
+ void testReadPartitionStatsIndex() throws Exception {
+ final String path = tempFile.getAbsolutePath();
+ Configuration conf = TestConfigurations.getDefaultConf(path);
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.setString("hoodie.metadata.index.partition.stats.enable", "true");
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withMetadataIndexColumnStats(true)
+ .build();
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ String[] queryColumns = {"uuid", "age"};
+ PartitionStatsIndex indexSupport = new PartitionStatsIndex(path,
TestConfigurations.ROW_TYPE, metadataConfig);
+ List<RowData> indexRows =
indexSupport.readColumnStatsIndexByColumns(queryColumns);
+ List<String> results =
indexRows.stream().map(Object::toString).sorted(String::compareTo).collect(Collectors.toList());
+ List<String> expected = Arrays.asList(
+ "+I(par1,+I(23),+I(33),0,2,age)",
+ "+I(par1,+I(id1),+I(id2),0,2,uuid)",
+ "+I(par2,+I(31),+I(53),0,2,age)",
+ "+I(par2,+I(id3),+I(id4),0,2,uuid)",
+ "+I(par3,+I(18),+I(20),0,2,age)",
+ "+I(par3,+I(id5),+I(id6),0,2,uuid)",
+ "+I(par4,+I(44),+I(56),0,2,age)",
+ "+I(par4,+I(id7),+I(id8),0,2,uuid)");
+ assertEquals(expected, results);
+
+ Pair<List<RowData>, String[]> transposedIndexTable =
indexSupport.transposeColumnStatsIndex(indexRows, queryColumns);
+ List<String> transposed =
transposedIndexTable.getLeft().stream().map(Object::toString).sorted(String::compareTo).collect(Collectors.toList());
+ assertThat(transposed.size(), is(4));
+ assertArrayEquals(new String[] {"age", "uuid"},
transposedIndexTable.getRight());
+ List<String> expected1 = Arrays.asList(
+ "+I(par1,2,23,33,0,id1,id2,0)",
+ "+I(par2,2,31,53,0,id3,id4,0)",
+ "+I(par3,2,18,20,0,id5,id6,0)",
+ "+I(par4,2,44,56,0,id7,id8,0)");
+ assertEquals(expected1, transposed);
+ }
+
@Test
void testTransposeColumnStatsIndex() throws Exception {
final String path = tempFile.getAbsolutePath();
@@ -63,9 +104,9 @@ public class TestColumnStatsIndices {
// explicit query columns
String[] queryColumns1 = {"uuid", "age"};
- List<RowData> indexRows1 = ColumnStatsIndices.readColumnStatsIndex(path,
metadataConfig, queryColumns1);
- Pair<List<RowData>, String[]> transposedIndexTable1 = ColumnStatsIndices
- .transposeColumnStatsIndex(indexRows1, queryColumns1,
TestConfigurations.ROW_TYPE);
+ FileStatsIndex indexSupport = new FileStatsIndex(path,
TestConfigurations.ROW_TYPE, metadataConfig);
+ List<RowData> indexRows1 =
indexSupport.readColumnStatsIndexByColumns(queryColumns1);
+ Pair<List<RowData>, String[]> transposedIndexTable1 =
indexSupport.transposeColumnStatsIndex(indexRows1, queryColumns1);
assertThat("The schema columns should sort by natural order",
Arrays.toString(transposedIndexTable1.getRight()), is("[age, uuid]"));
List<RowData> transposed1 =
filterOutFileNames(transposedIndexTable1.getLeft());
@@ -79,7 +120,7 @@ public class TestColumnStatsIndices {
// no query columns, only for tests
assertThrows(IllegalArgumentException.class,
- () -> ColumnStatsIndices.readColumnStatsIndex(path, metadataConfig,
new String[0]));
+ () -> indexSupport.readColumnStatsIndexByColumns(new String[0]));
}
private static List<RowData> filterOutFileNames(List<RowData> indexRows) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 77b66ff974e..eacb0b5e8a5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
@@ -507,6 +508,57 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result, expected, true);
}
+ @ParameterizedTest
+ @MethodSource("tableTypeAndBooleanTrueFalseParams")
+ void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.METADATA_ENABLED, true)
+ .option(FlinkOptions.READ_AS_STREAMING, true)
+
.option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), true)
+ .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true)
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
true);
+
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
true);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+ conf.set(FlinkOptions.TABLE_TYPE, tableType.name());
+ conf.set(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning);
+ // write one commit
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ List<String> sqls =
+ Arrays.asList(
+ // no filter
+ "select * from t1",
+ // filter by partition stats pruner only
+ "select * from t1 where uuid > 'id5' and age > 15",
+ // filter by partition stats pruner and dynamic partition pruner
+ "select * from t1 where uuid > 'id5' and age > 15 and `partition`
> 'par3'");
+ List<String> expectResults =
+ Arrays.asList(
+ "[+I[id1, Danny, 23, 1970-01-01T00:00:00.001, par1], "
+ + "+I[id2, Stephen, 33, 1970-01-01T00:00:00.002, par1], "
+ + "+I[id3, Julian, 53, 1970-01-01T00:00:00.003, par2], "
+ + "+I[id4, Fabian, 31, 1970-01-01T00:00:00.004, par2], "
+ + "+I[id5, Sophia, 18, 1970-01-01T00:00:00.005, par3], "
+ + "+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]",
+ "[+I[id6, Emma, 20, 1970-01-01T00:00:00.006, par3], "
+ + "+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]",
+ "[+I[id7, Bob, 44, 1970-01-01T00:00:00.007, par4], "
+ + "+I[id8, Han, 56, 1970-01-01T00:00:00.008, par4]]");
+ for (int i = 0; i < sqls.size(); i++) {
+ List<Row> result = execSelectSql(streamTableEnv, sqls.get(i), 10);
+ assertRowsEquals(result, expectResults.get(i));
+ }
+ }
+
@ParameterizedTest
@MethodSource("tableTypeAndBooleanTrueFalseParams")
void testStreamReadFilterByPartition(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 1e3550aee27..899b3b15667 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -18,10 +18,11 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.source.ExpressionPredicates;
-import org.apache.hudi.source.prune.DataPruner;
+import org.apache.hudi.source.prune.ColumnStatsProbe;
import org.apache.hudi.source.prune.PrimaryKeyPruners;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
@@ -48,6 +49,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.ThrowingSupplier;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +66,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.hudi.keygen.constant.KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED;
import static org.hamcrest.CoreMatchers.instanceOf;
@@ -163,8 +167,21 @@ public class TestHoodieTableSource {
List<ResolvedExpression> expectedFilters =
Collections.singletonList(filterExpr);
tableSource.applyFilters(expectedFilters);
HoodieTableSource copiedSource = (HoodieTableSource) tableSource.copy();
- DataPruner dataPruner = copiedSource.getDataPruner();
- assertNotNull(dataPruner);
+ ColumnStatsProbe columnStatsProbe = copiedSource.getColumnStatsProbe();
+ assertNotNull(columnStatsProbe);
+ }
+
+ @ParameterizedTest
+ @MethodSource("filtersAndResults")
+ void testDataSkippingWithPartitionStatsPruning(List<ResolvedExpression>
filters, List<String> expectedPartitions) throws Exception {
+ final String path = tempFile.getAbsolutePath();
+ conf = TestConfigurations.getDefaultConf(path);
+
conf.setBoolean(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(),
true);
+ conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ HoodieTableSource hoodieTableSource = createHoodieTableSource(conf);
+ hoodieTableSource.applyFilters(filters);
+ assertEquals(expectedPartitions, hoodieTableSource.getReadPartitions());
}
@ParameterizedTest
@@ -325,6 +342,38 @@ public class TestHoodieTableSource {
assertEquals(ExpressionPredicates.fromExpression(expectedFilters).toString(),
actualPredicates);
}
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private static Stream<Arguments> filtersAndResults() {
+ CallExpression filter1 =
+ new CallExpression(
+ BuiltInFunctionDefinitions.GREATER_THAN,
+ Arrays.asList(
+ new FieldReferenceExpression("uuid", DataTypes.STRING(), 0, 0),
+ new ValueLiteralExpression("id5",
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN());
+
+ CallExpression filter2 =
+ new CallExpression(
+ BuiltInFunctionDefinitions.LESS_THAN,
+ Arrays.asList(
+ new FieldReferenceExpression("partition", DataTypes.STRING(),
4, 4),
+ new ValueLiteralExpression("par4",
DataTypes.STRING().notNull())),
+ DataTypes.BOOLEAN());
+
+ Object[][] data = new Object[][] {
+ // pruned by partition stats pruner only.
+ {Arrays.asList(filter1), Arrays.asList("par3", "par4")},
+ // pruned by dynamic partition pruner only.
+ {Arrays.asList(filter2), Arrays.asList("par1", "par2", "par3")},
+ // pruned by dynamic pruner and stats pruner.
+ {Arrays.asList(filter1, filter2), Arrays.asList("par3")},
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
private HoodieTableSource getEmptyStreamingSource() {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index 37c2c704a1c..ee43902ddd7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -398,7 +398,7 @@ public class TestInputFormat {
.rowType(TestConfigurations.ROW_TYPE)
.conf(conf)
.path(FilePathUtils.toFlinkPath(metaClient.getBasePath()))
- .partitionPruner(PartitionPruners.getInstance("par1", "par2", "par3",
"par4"))
+
.partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1",
"par2", "par3", "par4")).build())
.skipCompaction(false)
.build();
@@ -437,7 +437,7 @@ public class TestInputFormat {
.rowType(TestConfigurations.ROW_TYPE)
.conf(conf)
.path(FilePathUtils.toFlinkPath(metaClient.getBasePath()))
- .partitionPruner(PartitionPruners.getInstance("par1", "par2", "par3",
"par4"))
+
.partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1",
"par2", "par3", "par4")).build())
.skipCompaction(true)
.build();
@@ -502,7 +502,7 @@ public class TestInputFormat {
.rowType(TestConfigurations.ROW_TYPE)
.conf(conf)
.path(FilePathUtils.toFlinkPath(metaClient.getBasePath()))
- .partitionPruner(PartitionPruners.getInstance("par1", "par2", "par3",
"par4"))
+
.partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1",
"par2", "par3", "par4")).build())
.skipClustering(true)
.build();
@@ -659,7 +659,7 @@ public class TestInputFormat {
.rowType(TestConfigurations.ROW_TYPE)
.conf(conf)
.path(FilePathUtils.toFlinkPath(metaClient.getBasePath()))
- .partitionPruner(PartitionPruners.getInstance("par1", "par2", "par3",
"par4"))
+
.partitionPruner(PartitionPruners.builder().candidatePartitions(Arrays.asList("par1",
"par2", "par3", "par4")).build())
.build();
// default read the latest commit