danny0405 commented on code in PR #12132:
URL: https://github.com/apache/hudi/pull/12132#discussion_r1808051472
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PartitionPruners.java:
##########
@@ -125,6 +138,154 @@ public Set<String> filter(Collection<String> partitions) {
}
}
+ /**
+ * ColumnStats partition pruner for hoodie table source which enables
partition stats index.
+ *
+ * <p>Note: The data of new partitions created after the job starts could be
read if they match the
+ * filter conditions.
+ */
+ public static class ColumnStatsPartitionPruner implements PartitionPruner {
+ private final String basePath;
+ private final HoodieMetadataConfig metadataConfig;
+ private final DataPruner dataPruner;
+ private final RowType rowType;
+
+ public ColumnStatsPartitionPruner(
+ RowType rowType,
+ String basePath,
+ HoodieMetadataConfig metadataConfig,
+ DataPruner dataPruner) {
+ this.rowType = rowType;
+ this.basePath = basePath;
+ this.metadataConfig = metadataConfig;
+ this.dataPruner = dataPruner;
+ }
+
+ @Override
+ public Set<String> filter(Collection<String> partitions) {
+ Set<String> candidatePartitions =
ColumnStatsIndices.candidatePartitionsInMetadataTable(
+ basePath, metadataConfig, rowType, dataPruner, new
ArrayList<>(partitions));
+ if (candidatePartitions == null) {
+ return new HashSet<>(partitions);
+ }
+ return
partitions.stream().filter(candidatePartitions::contains).collect(Collectors.toSet());
+ }
+ }
+
+ /**
+ * Chained partition pruner for hoodie table source which combines multiple
partition pruners.
+ */
+ public static class ChainedPartitionPruner implements PartitionPruner {
+ private final List<PartitionPruner> pruners;
+
+ public ChainedPartitionPruner(List<PartitionPruner> pruners) {
+ this.pruners = pruners;
+ }
+
+ @Override
+ public Set<String> filter(Collection<String> partitions) {
+ for (PartitionPruner pruner: pruners) {
+ partitions = pruner.filter(partitions);
+ }
+ return new HashSet<>(partitions);
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private RowType rowType;
+ private String basePath;
+ private Configuration conf;
+ private DataPruner dataPruner;
+ private List<ExpressionEvaluators.Evaluator> partitionEvaluators;
+ private List<String> partitionKeys;
+ private List<DataType> partitionTypes;
+ private String defaultParName;
+ private boolean hivePartition;
+ private Collection<String> candidatePartitions;
+
+ private Builder() {
+ }
+
+ public Builder rowType(RowType rowType) {
+ this.rowType = rowType;
+ return this;
+ }
+
+ public Builder basePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ public Builder conf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder dataPruner(DataPruner dataPruner) {
+ this.dataPruner = dataPruner;
+ return this;
+ }
+
+ public Builder partitionEvaluators(List<Evaluator> partitionEvaluators) {
+ this.partitionEvaluators = partitionEvaluators;
+ return this;
+ }
+
+ public Builder partitionKeys(List<String> partitionKeys) {
+ this.partitionKeys = partitionKeys;
+ return this;
+ }
+
+ public Builder partitionTypes(List<DataType> partitionTypes) {
+ this.partitionTypes = partitionTypes;
+ return this;
+ }
+
+ public Builder defaultParName(String defaultParName) {
+ this.defaultParName = defaultParName;
+ return this;
+ }
+
+ public Builder hivePartition(boolean hivePartition) {
+ this.hivePartition = hivePartition;
+ return this;
+ }
+
+ public Builder candidatePartitions(Collection<String> candidatePartitions)
{
+ this.candidatePartitions = candidatePartitions;
+ return this;
+ }
+
+ public PartitionPruner build() {
+ PartitionPruner staticPruner = null;
+ if (candidatePartitions != null && !candidatePartitions.isEmpty()) {
+ staticPruner = new StaticPartitionPruner(candidatePartitions);
+ }
+ PartitionPruner dynamicPruner = null;
+ if (partitionEvaluators != null && !partitionEvaluators.isEmpty()) {
+ Preconditions.checkArgument(partitionKeys != null && partitionTypes !=
null && defaultParName != null);
+ dynamicPruner = new DynamicPartitionPruner(partitionEvaluators,
partitionKeys, partitionTypes, defaultParName, hivePartition);
+ }
+ PartitionPruner columnStatsPruner = null;
+ if (dataPruner != null
+ && conf.get(FlinkOptions.READ_PARTITION_DATA_SKIPPING_ENABLED)
+ && conf.get(FlinkOptions.METADATA_ENABLED)) {
+ Preconditions.checkArgument(rowType != null && basePath != null &&
conf != null);
+ columnStatsPruner = new ColumnStatsPartitionPruner(rowType, basePath,
StreamerUtil.metadataConfig(conf), dataPruner);
+ }
+ List<PartitionPruner> partitionPruners =
Review Comment:
We should use specific pruner if it is singleton.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]