This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dbc0b433425 [HUDI-6328] Flink support generate resize plan for
consistent bucket index (#9030)
dbc0b433425 is described below
commit dbc0b433425fd57622b7028257ec6083b57394ce
Author: Jing Zhang <[email protected]>
AuthorDate: Mon Jun 26 17:23:53 2023 +0800
[HUDI-6328] Flink support generate resize plan for consistent bucket index
(#9030)
---
.../apache/hudi/config/HoodieClusteringConfig.java | 51 ++--
...istentHashingBucketClusteringPlanStrategy.java} | 125 ++++++----
.../apache/hudi/config/TestHoodieWriteConfig.java | 2 +-
...linkConsistentBucketClusteringPlanStrategy.java | 56 +++++
...parkConsistentBucketClusteringPlanStrategy.java | 276 +--------------------
...onsistentBucketClusteringExecutionStrategy.java | 6 +-
...arkConsistentBucketDuplicateUpdateStrategy.java | 7 +-
.../bucket/HoodieSparkConsistentBucketIndex.java | 8 +-
...parkConsistentBucketClusteringPlanStrategy.java | 6 +-
.../apache/hudi/configuration/OptionsResolver.java | 4 +
.../java/org/apache/hudi/sink/utils/Pipelines.java | 2 +-
.../java/org/apache/hudi/util/ClusteringUtil.java | 18 +-
.../org/apache/hudi/util/FlinkWriteClients.java | 10 +-
.../java/org/apache/hudi/util/StreamerUtil.java | 1 +
.../ITTestFlinkConsistentHashingClustering.java | 175 +++++++++++++
15 files changed, 378 insertions(+), 369 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index 9732c52ac42..cafed2febc6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -56,6 +56,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
"org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy";
public static final String FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.FlinkSizeBasedClusteringPlanStrategy";
+ public static final String FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY =
+
"org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy";
public static final String SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY =
"org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy";
public static final String JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY =
@@ -619,19 +621,8 @@ public class HoodieClusteringConfig extends HoodieConfig {
}
private void setDefaults() {
- // Consistent hashing bucket index
- if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
- &&
clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
- &&
clusteringConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())
- &&
clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name()))
{
- clusteringConfig.setDefaultValue(PLAN_STRATEGY_CLASS_NAME,
SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
- clusteringConfig.setDefaultValue(EXECUTION_STRATEGY_CLASS_NAME,
SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
- } else {
- clusteringConfig.setDefaultValue(
- PLAN_STRATEGY_CLASS_NAME,
getDefaultPlanStrategyClassName(engineType));
- clusteringConfig.setDefaultValue(
- EXECUTION_STRATEGY_CLASS_NAME,
getDefaultExecutionStrategyClassName(engineType));
- }
+ clusteringConfig.setDefaultValue(PLAN_STRATEGY_CLASS_NAME,
getDefaultPlanStrategyClassName(engineType));
+ clusteringConfig.setDefaultValue(EXECUTION_STRATEGY_CLASS_NAME,
getDefaultExecutionStrategyClassName(engineType));
clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName());
}
@@ -642,27 +633,35 @@ public class HoodieClusteringConfig extends HoodieConfig {
+ "schedule inline clustering (%s) can be enabled. Both can't be
set to true at the same time. %s,%s",
HoodieClusteringConfig.INLINE_CLUSTERING.key(),
HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(),
inlineCluster, inlineClusterSchedule));
- if (engineType.equals(EngineType.FLINK)) {
- // support resize for Flink to unlock the validation.
- return;
+ if (isConsistentHashingBucketIndex()) {
+ String planStrategy =
clusteringConfig.getString(PLAN_STRATEGY_CLASS_NAME);
+ if (engineType == EngineType.FLINK) {
+
ValidationUtils.checkArgument(planStrategy.equalsIgnoreCase(FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
+ "Consistent hashing bucket index only supports clustering plan
strategy : " + FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
+ } else {
+ ValidationUtils.checkArgument(
+
planStrategy.equalsIgnoreCase(SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
+ "Consistent hashing bucket index only supports clustering plan
strategy : " + SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
+ ValidationUtils.checkArgument(
+
clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY),
+ "Consistent hashing bucket index only supports clustering
execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
+ }
}
- // Consistent hashing bucket index
- if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
+ }
+
+ private boolean isConsistentHashingBucketIndex() {
+ return clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
&&
clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
&&
clusteringConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name())
- &&
clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name()))
{
-
ValidationUtils.checkArgument(clusteringConfig.getString(PLAN_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY),
- "Consistent hashing bucket index only supports clustering plan
strategy : " + SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY);
-
ValidationUtils.checkArgument(clusteringConfig.getString(EXECUTION_STRATEGY_CLASS_NAME).equals(SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY),
- "Consistent hashing bucket index only supports clustering
execution strategy : " + SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY);
- }
+ &&
clusteringConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key()).equalsIgnoreCase(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
}
private String getDefaultPlanStrategyClassName(EngineType engineType) {
switch (engineType) {
case SPARK:
- return SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
+ return isConsistentHashingBucketIndex() ?
SPARK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY :
SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
case FLINK:
+ return isConsistentHashingBucketIndex() ?
FLINK_CONSISTENT_BUCKET_CLUSTERING_PLAN_STRATEGY :
FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
case JAVA:
return JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY;
default:
@@ -673,7 +672,7 @@ public class HoodieClusteringConfig extends HoodieConfig {
private String getDefaultExecutionStrategyClassName(EngineType engineType)
{
switch (engineType) {
case SPARK:
- return SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
+ return isConsistentHashingBucketIndex() ?
SPARK_CONSISTENT_BUCKET_EXECUTION_STRATEGY :
SPARK_SORT_AND_SIZE_EXECUTION_STRATEGY;
case FLINK:
case JAVA:
return JAVA_SORT_AND_SIZE_EXECUTION_STRATEGY;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
similarity index 82%
copy from
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
copy to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
index a49ab1ddf4b..59f9fcb81d1 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
@@ -16,18 +16,15 @@
* limitations under the License.
*/
-package org.apache.hudi.client.clustering.plan.strategy;
+package org.apache.hudi.table.action.cluster.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView;
@@ -40,11 +37,9 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
-import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
+import org.apache.hudi.index.bucket.HoodieConsistentBucketIndex;
import org.apache.hudi.table.HoodieTable;
-import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
-import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,20 +55,23 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
/**
- * Clustering plan strategy specifically for consistent bucket index
+ * Clustering plan strategy specifically for consistent bucket index.
*/
-public class SparkConsistentBucketClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
- extends PartitionAwareClusteringPlanStrategy<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+public abstract class BaseConsistentHashingBucketClusteringPlanStrategy<T
extends HoodieRecordPayload, I, K, O>
+ extends PartitionAwareClusteringPlanStrategy<T, I, K, O> {
- private static final Logger LOG =
LoggerFactory.getLogger(SparkConsistentBucketClusteringPlanStrategy.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseConsistentHashingBucketClusteringPlanStrategy.class);
public static final String METADATA_PARTITION_KEY =
"clustering.group.partition";
public static final String METADATA_CHILD_NODE_KEY =
"clustering.group.child.node";
public static final String METADATA_SEQUENCE_NUMBER_KEY =
"clustering.group.sequence.no";
- public SparkConsistentBucketClusteringPlanStrategy(HoodieTable table,
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
+ public BaseConsistentHashingBucketClusteringPlanStrategy(
+ HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig
writeConfig) {
super(table, engineContext, writeConfig);
- validate();
+ ValidationUtils.checkArgument(
+ getHoodieTable().getIndex() instanceof HoodieConsistentBucketIndex,
+ this.getClass().getName() + " is only applicable to table with
consistent hash index.");
}
/**
@@ -93,6 +91,64 @@ public class SparkConsistentBucketClusteringPlanStrategy<T
extends HoodieRecordP
return true;
}
+ /**
+ * Generate cluster group based on split, merge and sort rules
+ */
+ @Override
+ protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
+ Option<HoodieConsistentHashingMetadata> metadata =
ConsistentBucketIndexUtils.loadMetadata(getHoodieTable(), partitionPath);
+ ValidationUtils.checkArgument(metadata.isPresent(), "Metadata is empty for
partition: " + partitionPath);
+ ConsistentBucketIdentifier identifier = new
ConsistentBucketIdentifier(metadata.get());
+
+ // Apply split rule
+ int splitSlot = getWriteConfig().getBucketIndexMaxNumBuckets() -
identifier.getNumBuckets();
+ Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> splitResult =
+ buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
+ List<HoodieClusteringGroup> ret = new ArrayList<>(splitResult.getLeft());
+
+ List<FileSlice> remainedSlices = splitResult.getRight();
+ if (isBucketClusteringMergeEnabled()) {
+ // Apply merge rule
+ int mergeSlot = identifier.getNumBuckets() -
getWriteConfig().getBucketIndexMinNumBuckets() + splitResult.getMiddle();
+ Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>>
mergeResult =
+ buildMergeClusteringGroup(identifier, remainedSlices, mergeSlot);
+ ret.addAll(mergeResult.getLeft());
+ remainedSlices = mergeResult.getRight();
+ }
+ if (isBucketClusteringSortEnabled()) {
+ // Apply sort only to the remaining file groups
+ ret.addAll(remainedSlices.stream().map(fs -> {
+ ConsistentHashingNode oldNode =
identifier.getBucketByFileId(fs.getFileId());
+ ConsistentHashingNode newNode = new
ConsistentHashingNode(oldNode.getValue(), FSUtils.createNewFileIdPfx(),
ConsistentHashingNode.NodeTag.REPLACE);
+ return HoodieClusteringGroup.newBuilder()
+ .setSlices(getFileSliceInfo(Collections.singletonList(fs)))
+ .setNumOutputFileGroups(1)
+ .setMetrics(buildMetrics(Collections.singletonList(fs)))
+ .setExtraMetadata(constructExtraMetadata(fs.getPartitionPath(),
Collections.singletonList(newNode), identifier.getMetadata().getSeqNo()))
+ .build();
+ }).collect(Collectors.toList()));
+ }
+ return ret.stream();
+ }
+
+ /**
+ * Whether enable buckets merged when using consistent hashing bucket index.
+ *
+ * @return true if bucket merge is enabled, false otherwise.
+ */
+ protected boolean isBucketClusteringMergeEnabled() {
+ return true;
+ }
+
+ /**
+ * Whether generate regular sort clustering plans for buckets that are not
involved in merge or split.
+ *
+ * @return true if generate regular sort clustering plans for buckets that
are not involved in merge or split, false otherwise.
+ */
+ protected boolean isBucketClusteringSortEnabled() {
+ return true;
+ }
+
/**
* Generate candidate clustering file slices of the given partition.
* If there is inflight / requested clustering working on the partition,
then return empty list
@@ -121,42 +177,6 @@ public class SparkConsistentBucketClusteringPlanStrategy<T
extends HoodieRecordP
return params;
}
- /**
- * Generate cluster group based on split, merge and sort rules
- */
- @Override
- protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
- Option<HoodieConsistentHashingMetadata> metadata =
ConsistentBucketIndexUtils.loadMetadata(getHoodieTable(), partitionPath);
- ValidationUtils.checkArgument(metadata.isPresent(), "Metadata is empty for
partition: " + partitionPath);
- ConsistentBucketIdentifier identifier = new
ConsistentBucketIdentifier(metadata.get());
-
- // Apply split rule
- int splitSlot = getWriteConfig().getBucketIndexMaxNumBuckets() -
identifier.getNumBuckets();
- Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> splitResult =
- buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
- List<HoodieClusteringGroup> ret = new ArrayList<>(splitResult.getLeft());
-
- // Apply merge rule
- int mergeSlot = identifier.getNumBuckets() -
getWriteConfig().getBucketIndexMinNumBuckets() + splitResult.getMiddle();
- Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> mergeResult =
- buildMergeClusteringGroup(identifier, splitResult.getRight(),
mergeSlot);
- ret.addAll(mergeResult.getLeft());
-
- // Apply sort only to the remaining file groups
- ret.addAll(mergeResult.getRight().stream().map(fs -> {
- ConsistentHashingNode oldNode =
identifier.getBucketByFileId(fs.getFileId());
- ConsistentHashingNode newNode = new
ConsistentHashingNode(oldNode.getValue(), FSUtils.createNewFileIdPfx(),
ConsistentHashingNode.NodeTag.REPLACE);
- return HoodieClusteringGroup.newBuilder()
- .setSlices(getFileSliceInfo(Collections.singletonList(fs)))
- .setNumOutputFileGroups(1)
- .setMetrics(buildMetrics(Collections.singletonList(fs)))
- .setExtraMetadata(constructExtraMetadata(fs.getPartitionPath(),
Collections.singletonList(newNode), identifier.getMetadata().getSeqNo()))
- .build();
- }).collect(Collectors.toList()));
-
- return ret.stream();
- }
-
/**
* Generate clustering groups according to split rules.
* Currently, we always split bucket into two sub-buckets.
@@ -298,7 +318,7 @@ public class SparkConsistentBucketClusteringPlanStrategy<T
extends HoodieRecordP
extraMetadata.put(METADATA_CHILD_NODE_KEY,
ConsistentHashingNode.toJsonString(nodes));
extraMetadata.put(METADATA_SEQUENCE_NUMBER_KEY, Integer.toString(seqNo));
} catch (IOException e) {
- LOG.error("Failed to construct extra metadata, partition: " + partition
+ ", nodes:" + nodes);
+ LOG.error("Failed to construct extra metadata, partition: {}, nodes:{}",
partition, nodes);
throw new HoodieClusteringException("Failed to construct extra metadata,
partition: " + partition + ", nodes:" + nodes);
}
return extraMetadata;
@@ -313,9 +333,4 @@ public class SparkConsistentBucketClusteringPlanStrategy<T
extends HoodieRecordP
HoodieFileFormat format =
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
return (long) (getWriteConfig().getMaxFileSize(format) *
getWriteConfig().getBucketMergeThreshold());
}
-
- private void validate() {
- ValidationUtils.checkArgument(getHoodieTable().getIndex() instanceof
HoodieSparkConsistentBucketIndex,
- "SparConsistentBucketClusteringPlanStrategy is only applicable to
table with consistent hash index");
- }
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 5f4e52be2fe..f9d5d69aec0 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -105,7 +105,7 @@ public class TestHoodieWriteConfig {
testEngineSpecificConfig(HoodieWriteConfig::getClusteringPlanStrategyClass,
constructConfigMap(
EngineType.SPARK,
HoodieClusteringConfig.SPARK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
- EngineType.FLINK,
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
+ EngineType.FLINK,
HoodieClusteringConfig.FLINK_SIZED_BASED_CLUSTERING_PLAN_STRATEGY,
EngineType.JAVA,
HoodieClusteringConfig.JAVA_SIZED_BASED_CLUSTERING_PLAN_STRATEGY));
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkConsistentBucketClusteringPlanStrategy.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkConsistentBucketClusteringPlanStrategy.java
new file mode 100644
index 00000000000..8a3ecf1916b
--- /dev/null
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/FlinkConsistentBucketClusteringPlanStrategy.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client.clustering.plan.strategy;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
+
+import java.util.List;
+
+/**
+ * Consistent hashing bucket index clustering plan for Flink engine.
+ */
+public class FlinkConsistentBucketClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
+ extends BaseConsistentHashingBucketClusteringPlanStrategy<T,
List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
+
+ public FlinkConsistentBucketClusteringPlanStrategy(
+ HoodieTable table, HoodieEngineContext engineContext,
+ HoodieWriteConfig writeConfig) {
+ super(table, engineContext, writeConfig);
+ }
+
+ @Override
+ protected boolean isBucketClusteringMergeEnabled() {
+ // Flink would not generate merge plan because it would cause multiple
write sub tasks write to the same file group.
+ return false;
+ }
+
+ @Override
+ protected boolean isBucketClusteringSortEnabled() {
+ // Flink would not generate sort clustering plans for buckets that are not
involved in merge or split to avoid unnecessary clustering costs.
+ return false;
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
index a49ab1ddf4b..bb3cc9f7aa7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
@@ -21,301 +21,39 @@ package org.apache.hudi.client.clustering.plan.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.FileSlice;
-import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.TableFileSystemView;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Triple;
-import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
-import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
-import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.table.HoodieTable;
-import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
+import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
import org.apache.spark.api.java.JavaRDD;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
/**
- * Clustering plan strategy specifically for consistent bucket index
+ * Consistent hashing bucket index clustering plan for spark engine.
*/
public class SparkConsistentBucketClusteringPlanStrategy<T extends
HoodieRecordPayload<T>>
- extends PartitionAwareClusteringPlanStrategy<T, JavaRDD<HoodieRecord<T>>,
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(SparkConsistentBucketClusteringPlanStrategy.class);
-
- public static final String METADATA_PARTITION_KEY =
"clustering.group.partition";
- public static final String METADATA_CHILD_NODE_KEY =
"clustering.group.child.node";
- public static final String METADATA_SEQUENCE_NUMBER_KEY =
"clustering.group.sequence.no";
+ extends BaseConsistentHashingBucketClusteringPlanStrategy<T,
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
public SparkConsistentBucketClusteringPlanStrategy(HoodieTable table,
HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
super(table, engineContext, writeConfig);
- validate();
- }
-
- /**
- * TODO maybe add force config to schedule the clustering. It could allow
clustering on partitions that are not doing write operation.
- * Block clustering if there is any ongoing concurrent writers
- *
- * @return true if the schedule can proceed
- */
- @Override
- public boolean checkPrecondition() {
- HoodieTimeline timeline =
getHoodieTable().getActiveTimeline().getDeltaCommitTimeline().filterInflightsAndRequested();
- if (!timeline.empty()) {
- LOG.warn("When using consistent bucket, clustering cannot be scheduled
async if there are concurrent writers. "
- + "Writer instant: " + timeline.getInstants());
- return false;
- }
- return true;
}
- /**
- * Generate candidate clustering file slices of the given partition.
- * If there is inflight / requested clustering working on the partition,
then return empty list
- * to ensure serialized update to the hashing metadata.
- *
- * @return candidate file slices to be clustered (i.e., sort, bucket split
or merge)
- */
@Override
- protected Stream<FileSlice> getFileSlicesEligibleForClustering(String
partition) {
- TableFileSystemView fileSystemView = getHoodieTable().getFileSystemView();
- boolean isPartitionInClustering =
fileSystemView.getFileGroupsInPendingClustering().anyMatch(p ->
p.getLeft().getPartitionPath().equals(partition));
- if (isPartitionInClustering) {
- LOG.info("Partition: " + partition + " is already in clustering, skip");
- return Stream.empty();
- }
-
- return super.getFileSlicesEligibleForClustering(partition);
- }
-
- @Override
- protected Map<String, String> getStrategyParams() {
- Map<String, String> params = new HashMap<>();
- if
(!StringUtils.isNullOrEmpty(getWriteConfig().getClusteringSortColumns())) {
- params.put(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(),
getWriteConfig().getClusteringSortColumns());
- }
- return params;
+ protected Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>>
buildMergeClusteringGroup(
+ ConsistentBucketIdentifier identifier, List<FileSlice> fileSlices, int
mergeSlot) {
+ return super.buildMergeClusteringGroup(identifier, fileSlices, mergeSlot);
}
- /**
- * Generate cluster group based on split, merge and sort rules
- */
@Override
- protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
- Option<HoodieConsistentHashingMetadata> metadata =
ConsistentBucketIndexUtils.loadMetadata(getHoodieTable(), partitionPath);
- ValidationUtils.checkArgument(metadata.isPresent(), "Metadata is empty for
partition: " + partitionPath);
- ConsistentBucketIdentifier identifier = new
ConsistentBucketIdentifier(metadata.get());
-
- // Apply split rule
- int splitSlot = getWriteConfig().getBucketIndexMaxNumBuckets() -
identifier.getNumBuckets();
- Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> splitResult =
- buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
- List<HoodieClusteringGroup> ret = new ArrayList<>(splitResult.getLeft());
-
- // Apply merge rule
- int mergeSlot = identifier.getNumBuckets() -
getWriteConfig().getBucketIndexMinNumBuckets() + splitResult.getMiddle();
- Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>> mergeResult =
- buildMergeClusteringGroup(identifier, splitResult.getRight(),
mergeSlot);
- ret.addAll(mergeResult.getLeft());
-
- // Apply sort only to the remaining file groups
- ret.addAll(mergeResult.getRight().stream().map(fs -> {
- ConsistentHashingNode oldNode =
identifier.getBucketByFileId(fs.getFileId());
- ConsistentHashingNode newNode = new
ConsistentHashingNode(oldNode.getValue(), FSUtils.createNewFileIdPfx(),
ConsistentHashingNode.NodeTag.REPLACE);
- return HoodieClusteringGroup.newBuilder()
- .setSlices(getFileSliceInfo(Collections.singletonList(fs)))
- .setNumOutputFileGroups(1)
- .setMetrics(buildMetrics(Collections.singletonList(fs)))
- .setExtraMetadata(constructExtraMetadata(fs.getPartitionPath(),
Collections.singletonList(newNode), identifier.getMetadata().getSeqNo()))
- .build();
- }).collect(Collectors.toList()));
-
- return ret.stream();
- }
-
- /**
- * Generate clustering groups according to split rules.
- * Currently, we always split bucket into two sub-buckets.
- *
- * @param identifier bucket identifier
- * @param fileSlices file slice candidate to be built as split clustering
groups
- * @param splitSlot number of new bucket allowed to produce, in order to
constrain the upper bound of the total number of bucket
- * @return list of clustering group, number of new buckets generated,
remaining file slice (that does not split)
- */
protected Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>>
buildSplitClusteringGroups(
ConsistentBucketIdentifier identifier, List<FileSlice> fileSlices, int
splitSlot) {
- List<HoodieClusteringGroup> retGroup = new ArrayList<>();
- List<FileSlice> fsUntouched = new ArrayList<>();
- long splitSize = getSplitSize();
- int remainingSplitSlot = splitSlot;
- for (FileSlice fs : fileSlices) {
- boolean needSplit = fs.getTotalFileSize() > splitSize;
- if (!needSplit || remainingSplitSlot == 0) {
- fsUntouched.add(fs);
- continue;
- }
-
- Option<List<ConsistentHashingNode>> nodes =
identifier.splitBucket(fs.getFileId());
-
- // Bucket cannot be split
- if (!nodes.isPresent()) {
- fsUntouched.add(fs);
- continue;
- }
-
- remainingSplitSlot--;
- List<FileSlice> fsList = Collections.singletonList(fs);
- retGroup.add(HoodieClusteringGroup.newBuilder()
- .setSlices(getFileSliceInfo(fsList))
- .setNumOutputFileGroups(2)
- .setMetrics(buildMetrics(fsList))
- .setExtraMetadata(constructExtraMetadata(fs.getPartitionPath(),
nodes.get(), identifier.getMetadata().getSeqNo()))
- .build());
- }
- return Triple.of(retGroup, splitSlot - remainingSplitSlot, fsUntouched);
- }
-
- /**
- * Generate clustering group according to merge rules
- *
- * @param identifier bucket identifier
- * @param fileSlices file slice candidates to be built as merge clustering
groups
- * @param mergeSlot number of bucket allowed to be merged, in order to
guarantee the lower bound of the total number of bucket
- * @return list of clustering group, number of buckets merged (removed),
remaining file slice (that does not be merged)
- */
- protected Triple<List<HoodieClusteringGroup>, Integer, List<FileSlice>>
buildMergeClusteringGroup(
- ConsistentBucketIdentifier identifier, List<FileSlice> fileSlices, int
mergeSlot) {
- if (fileSlices.size() <= 1) {
- return Triple.of(Collections.emptyList(), 0, fileSlices);
- }
-
- long mergeSize = getMergeSize();
- int remainingMergeSlot = mergeSlot;
- List<HoodieClusteringGroup> groups = new ArrayList<>();
- boolean[] added = new boolean[fileSlices.size()];
-
- fileSlices.sort(Comparator.comparingInt(a ->
identifier.getBucketByFileId(a.getFileId()).getValue()));
- // In each round, we check if the ith file slice can be merged with its
predecessors and successors
- for (int i = 0; i < fileSlices.size(); ++i) {
- if (added[i] || fileSlices.get(i).getTotalFileSize() > mergeSize) {
- continue;
- }
-
- // 0: startIdx, 1: endIdx
- int[] rangeIdx = {i, i};
- long totalSize = fileSlices.get(i).getTotalFileSize();
- // Do backward check first (k == 0), and then forward check (k == 1)
- for (int k = 0; k < 2; ++k) {
- boolean forward = k == 1;
- do {
- int nextIdx = forward ? (rangeIdx[k] + 1 < fileSlices.size() ?
rangeIdx[k] + 1 : 0) : (rangeIdx[k] >= 1 ? rangeIdx[k] - 1 : fileSlices.size()
- 1);
- boolean isNeighbour =
identifier.getBucketByFileId(fileSlices.get(nextIdx).getFileId()) ==
identifier.getFormerBucket(fileSlices.get(rangeIdx[k]).getFileId());
- /**
- * Merge condition:
- * 1. there is still slot to merge bucket
- * 2. the previous file slices is not merged
- * 3. the previous file slice and current file slice are neighbour
in the hash ring
- * 4. Both the total file size up to now and the previous file slice
size are smaller than merge size threshold
- */
- if (remainingMergeSlot == 0 || added[nextIdx] || !isNeighbour ||
totalSize > mergeSize || fileSlices.get(nextIdx).getTotalFileSize() >
mergeSize) {
- break;
- }
-
- // Mark preIdx as merge candidate
- totalSize += fileSlices.get(nextIdx).getTotalFileSize();
- rangeIdx[k] = nextIdx;
- remainingMergeSlot--;
- } while (rangeIdx[k] != i);
- }
-
- int startIdx = rangeIdx[0];
- int endIdx = rangeIdx[1];
- if (endIdx == i && startIdx == i) {
- continue;
- }
-
- // Construct merge group if there is at least two file slices
- List<FileSlice> fs = new ArrayList<>();
- while (true) {
- added[startIdx] = true;
- fs.add(fileSlices.get(startIdx));
- if (startIdx == endIdx) {
- break;
- }
- startIdx = startIdx + 1 < fileSlices.size() ? startIdx + 1 : 0;
- }
-
- groups.add(HoodieClusteringGroup.newBuilder()
- .setSlices(getFileSliceInfo(fs))
- .setNumOutputFileGroups(1)
- .setMetrics(buildMetrics(fs))
- .setExtraMetadata(
- constructExtraMetadata(
- fs.get(0).getPartitionPath(),
-
identifier.mergeBucket(fs.stream().map(FileSlice::getFileId).collect(Collectors.toList())),
- identifier.getMetadata().getSeqNo()))
- .build());
- }
-
- // Collect file slices that are not involved in merge
- List<FileSlice> fsUntouched = IntStream.range(0,
fileSlices.size()).filter(i -> !added[i])
- .mapToObj(fileSlices::get).collect(Collectors.toList());
-
- return Triple.of(groups, mergeSlot - remainingMergeSlot, fsUntouched);
- }
-
- /**
- * Construct extra metadata for clustering group
- */
- private Map<String, String> constructExtraMetadata(String partition,
List<ConsistentHashingNode> nodes, int seqNo) {
- Map<String, String> extraMetadata = new HashMap<>();
- try {
- extraMetadata.put(METADATA_PARTITION_KEY, partition);
- extraMetadata.put(METADATA_CHILD_NODE_KEY,
ConsistentHashingNode.toJsonString(nodes));
- extraMetadata.put(METADATA_SEQUENCE_NUMBER_KEY, Integer.toString(seqNo));
- } catch (IOException e) {
- LOG.error("Failed to construct extra metadata, partition: " + partition
+ ", nodes:" + nodes);
- throw new HoodieClusteringException("Failed to construct extra metadata,
partition: " + partition + ", nodes:" + nodes);
- }
- return extraMetadata;
- }
-
- private long getSplitSize() {
- HoodieFileFormat format =
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
- return (long) (getWriteConfig().getMaxFileSize(format) *
getWriteConfig().getBucketSplitThreshold());
- }
-
- private long getMergeSize() {
- HoodieFileFormat format =
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
- return (long) (getWriteConfig().getMaxFileSize(format) *
getWriteConfig().getBucketMergeThreshold());
- }
-
- private void validate() {
- ValidationUtils.checkArgument(getHoodieTable().getIndex() instanceof
HoodieSparkConsistentBucketIndex,
- "SparConsistentBucketClusteringPlanStrategy is only applicable to
table with consistent hash index");
+ return super.buildSplitClusteringGroups(identifier, fileSlices, splitSlot);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
index 6469b3108a6..698136d9b20 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
@@ -19,7 +19,6 @@
package org.apache.hudi.client.clustering.run.strategy;
import org.apache.hudi.client.WriteStatus;
-import
org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ConsistentHashingNode;
@@ -30,6 +29,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import
org.apache.hudi.execution.bulkinsert.RDDConsistentBucketBulkInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
+import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
import org.apache.avro.Schema;
@@ -80,8 +80,8 @@ public class
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
RDDConsistentBucketBulkInsertPartitioner<T> partitioner = new
RDDConsistentBucketBulkInsertPartitioner<>(getHoodieTable(), strategyParams,
preserveHoodieMetadata);
try {
- List<ConsistentHashingNode> nodes =
ConsistentHashingNode.fromJsonString(extraMetadata.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
-
partitioner.addHashingChildrenNodes(extraMetadata.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
nodes);
+ List<ConsistentHashingNode> nodes =
ConsistentHashingNode.fromJsonString(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
+
partitioner.addHashingChildrenNodes(extraMetadata.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
nodes);
} catch (Exception e) {
LOG.error("Failed to add hashing children nodes", e);
throw new HoodieClusteringException("Failed to add hashing children
nodes", e);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 4ddacc7f3cb..7d13a5940b4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.update.strategy;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
-import
org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -40,6 +39,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.table.HoodieTable;
+import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.slf4j.Logger;
@@ -121,7 +121,7 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T
extends HoodieRecord
Map<String,
HoodieConsistentHashingMetadata> partitionToHashingMeta, Map<String, String>
partitionToInstant) {
for (HoodieClusteringGroup group : plan.getInputGroups()) {
Map<String, String> groupMeta = group.getExtraMetadata();
- String p =
groupMeta.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+ String p =
groupMeta.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
checkState(p != null, "Clustering plan does not has partition info,
plan: " + plan);
// Skip unrelated clustering group
if (!recordPartitions.contains(p)) {
@@ -137,7 +137,7 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T
extends HoodieRecord
}
try {
- String nodeJson =
group.getExtraMetadata().get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
+ String nodeJson =
group.getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
List<ConsistentHashingNode> nodes =
ConsistentHashingNode.fromJsonString(nodeJson);
partitionToHashingMeta.get(p).getChildrenNodes().addAll(nodes);
} catch (Exception e) {
@@ -146,5 +146,4 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T
extends HoodieRecord
}
}
}
-
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
index b9a77e56f67..b49599b4c51 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
@@ -21,7 +21,6 @@ package org.apache.hudi.index.bucket;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.client.WriteStatus;
-import
org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ConsistentHashingNode;
@@ -36,6 +35,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.table.HoodieTable;
+import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
import java.util.ArrayList;
import java.util.List;
@@ -79,16 +79,16 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieConsistentBucketInde
HoodieClusteringPlan plan = instantPlanPair.get().getRight();
HoodieJavaRDD.getJavaRDD(context.parallelize(plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList())))
- .mapToPair(m -> new
Tuple2<>(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
m)
+ .mapToPair(m -> new
Tuple2<>(m.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
m)
).groupByKey().foreach((input) -> {
// Process each partition
String partition = input._1();
List<ConsistentHashingNode> childNodes = new ArrayList<>();
int seqNo = 0;
for (Map<String, String> m: input._2()) {
- String nodesJson =
m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
+ String nodesJson =
m.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
childNodes.addAll(ConsistentHashingNode.fromJsonString(nodesJson));
- seqNo =
Integer.parseInt(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY));
+ seqNo =
Integer.parseInt(m.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY));
}
Option<HoodieConsistentHashingMetadata> metadataOption =
ConsistentBucketIndexUtils.loadMetadata(hoodieTable, partition);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
index 7c03a02f5fc..8760f63c9f1 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/clustering/plan/strategy/TestSparkConsistentBucketClusteringPlanStrategy.java
@@ -36,6 +36,7 @@ import
org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
+import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.junit.jupiter.api.AfterEach;
@@ -152,7 +153,7 @@ public class
TestSparkConsistentBucketClusteringPlanStrategy extends HoodieClien
Assertions.assertEquals(fileSlices.get(7).getFileId(),
groups.get(0).getSlices().get(1).getFileId());
Assertions.assertEquals(fileSlices.get(6).getFileId(),
groups.get(0).getSlices().get(0).getFileId());
Assertions.assertEquals(3, groups.get(0).getSlices().size());
- List<ConsistentHashingNode> nodes =
ConsistentHashingNode.fromJsonString(groups.get(0).getExtraMetadata().get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
+ List<ConsistentHashingNode> nodes =
ConsistentHashingNode.fromJsonString(groups.get(0).getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
Assertions.assertEquals(3, nodes.size());
Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE,
nodes.get(0).getTag());
Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE,
nodes.get(1).getTag());
@@ -163,7 +164,7 @@ public class
TestSparkConsistentBucketClusteringPlanStrategy extends HoodieClien
Assertions.assertEquals(fileSlices.get(2).getFileId(),
groups.get(1).getSlices().get(0).getFileId());
Assertions.assertEquals(fileSlices.get(3).getFileId(),
groups.get(1).getSlices().get(1).getFileId());
Assertions.assertEquals(2, groups.get(1).getSlices().size());
- nodes =
ConsistentHashingNode.fromJsonString(groups.get(1).getExtraMetadata().get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
+ nodes =
ConsistentHashingNode.fromJsonString(groups.get(1).getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY));
Assertions.assertEquals(2, nodes.size());
Assertions.assertEquals(ConsistentHashingNode.NodeTag.DELETE,
nodes.get(0).getTag());
Assertions.assertEquals(ConsistentHashingNode.NodeTag.REPLACE,
nodes.get(1).getTag());
@@ -192,5 +193,4 @@ public class
TestSparkConsistentBucketClusteringPlanStrategy extends HoodieClien
return fs;
}
-
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 4b90ab4c73a..e7a6901b0b2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -140,6 +140,10 @@ public class OptionsResolver {
return HoodieIndex.BucketIndexEngineType.valueOf(bucketEngineType);
}
+ public static boolean isConsistentHashingBucketIndexType(Configuration conf)
{
+ return isBucketIndexType(conf) &&
getBucketEngineType(conf).equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING);
+ }
+
/**
* Returns whether the source should emit changelog.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 474e2b18e60..900ccc77670 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -109,7 +109,7 @@ public class Pipelines {
WriteOperatorFactory<RowData> operatorFactory =
BulkInsertWriteOperator.getFactory(conf, rowType);
if (OptionsResolver.isBucketIndexType(conf)) {
// TODO support bulk insert for consistent bucket index
- if (OptionsResolver.getBucketEngineType(conf) ==
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING) {
+ if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
throw new HoodieException(
"Consistent hashing bucket index does not work with bulk insert
using FLINK engine. Use simple bucket index or Spark engine.");
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index b183ba3a4b0..2d47bb8a1b4 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -25,9 +25,13 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieFlinkTable;
+import
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
@@ -46,7 +50,19 @@ public class ClusteringUtil {
public static void validateClusteringScheduling(Configuration conf) {
if (OptionsResolver.isBucketIndexType(conf)) {
- throw new UnsupportedOperationException("Clustering is not supported for
bucket index.");
+ HoodieIndex.BucketIndexEngineType bucketIndexEngineType =
OptionsResolver.getBucketEngineType(conf);
+ switch (bucketIndexEngineType) {
+ case SIMPLE:
+ throw new UnsupportedOperationException("Clustering is not supported
for simple bucket index.");
+ case CONSISTENT_HASHING:
+ if
(!conf.get(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS).equalsIgnoreCase(FlinkConsistentBucketClusteringPlanStrategy.class.getName()))
{
+ throw new HoodieNotSupportedException(
+ "CLUSTERING_PLAN_STRATEGY_CLASS should be set to " +
FlinkConsistentBucketClusteringPlanStrategy.class.getName() + " in order to
work with Consistent Hashing Bucket Index.");
+ }
+ break;
+ default:
+ throw new HoodieNotSupportedException("Unknown bucket index engine
type: " + bucketIndexEngineType);
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index c039cbaacd8..1835e9c894b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -23,8 +23,8 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.WriteOperationType;
@@ -40,6 +40,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
+import
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -163,7 +164,7 @@ public class FlinkWriteClients {
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
-
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS))
+
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
getDefaultPlanStrategyClassName(conf)))
.withClusteringPlanPartitionFilterMode(
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
@@ -245,4 +246,9 @@ public class FlinkWriteClients {
}
return writeConfig;
}
+
+ private static String getDefaultPlanStrategyClassName(Configuration conf) {
+ return OptionsResolver.isConsistentHashingBucketIndexType(conf) ?
FlinkConsistentBucketClusteringPlanStrategy.class.getName() :
+ FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.defaultValue();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 6cfa0abd218..5342d7b4192 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -160,6 +160,7 @@ public class StreamerUtil {
.withBucketNum(String.valueOf(conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS)))
.withRecordKeyField(conf.getString(FlinkOptions.RECORD_KEY_FIELD))
.withIndexKeyField(OptionsResolver.getIndexKeyField(conf))
+ .withBucketIndexEngineType(OptionsResolver.getBucketEngineType(conf))
.withEngineType(EngineType.FLINK)
.build();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestFlinkConsistentHashingClustering.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestFlinkConsistentHashingClustering.java
new file mode 100644
index 00000000000..e52fe8b976a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestFlinkConsistentHashingClustering.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.cluster;
+
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sink.clustering.FlinkClusteringConfig;
+import org.apache.hudi.table.HoodieFlinkTable;
+import
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
+import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.TestSQL;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class ITTestFlinkConsistentHashingClustering {
+
+ private static final Map<String, String> EXPECTED_AFTER_INITIAL_INSERT = new
HashMap<>();
+ private static final Map<String, String> EXPECTED_AFTER_UPSERT = new
HashMap<>();
+
+ static {
+ EXPECTED_AFTER_INITIAL_INSERT.put("", "id1,,id1,Danny,23,1000,,
id2,,id2,Stephen,33,2000,, "
+ + "id3,,id3,Julian,53,3000,, id4,,id4,Fabian,31,4000,,
id5,,id5,Sophia,18,5000,, "
+ + "id6,,id6,Emma,20,6000,, id7,,id7,Bob,44,7000,,
id8,,id8,Han,56,8000,, ]");
+ EXPECTED_AFTER_UPSERT.put("", "[id1,,id1,Danny,24,1000,,
id2,,id2,Stephen,34,2000,, id3,,id3,Julian,54,3000,, "
+ + "id4,,id4,Fabian,32,4000,, id5,,id5,Sophia,18,5000,,
id6,,id6,Emma,20,6000,, "
+ + "id7,,id7,Bob,44,7000,, id8,,id8,Han,56,8000,,
id9,,id9,Jane,19,6000,, "
+ + "id10,,id10,Ella,38,7000,, id11,,id11,Phoebe,52,8000,,]");
+ }
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ public void testScheduleSplitPlan() throws Exception {
+ TableEnvironment tableEnv = setupTableEnv();
+ prepareData(tableEnv);
+
+ Configuration conf = getDefaultConfiguration();
+ conf.setString(HoodieIndexConfig.BUCKET_INDEX_MIN_NUM_BUCKETS.key(), "4");
+ conf.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
+ // Manually set the split threshold to trigger split in the clustering
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ conf.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(),
String.valueOf(1 / 1024.0 / 1024.0));
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf);
+ Option<String> clusteringInstantOption =
writeClient.scheduleClustering(Option.empty());
+ Assertions.assertTrue(clusteringInstantOption.isPresent());
+
+ // Validate clustering plan
+ HoodieClusteringPlan clusteringPlan = getLatestClusteringPlan(writeClient);
+ Assertions.assertEquals(4, clusteringPlan.getInputGroups().size());
+ Assertions.assertEquals(1,
clusteringPlan.getInputGroups().get(0).getSlices().size());
+ Assertions.assertEquals(1,
clusteringPlan.getInputGroups().get(1).getSlices().size());
+ Assertions.assertEquals(1,
clusteringPlan.getInputGroups().get(2).getSlices().size());
+ Assertions.assertEquals(1,
clusteringPlan.getInputGroups().get(3).getSlices().size());
+ }
+
+ @Test
+ public void testScheduleMergePlan() throws Exception {
+ TableEnvironment tableEnv = setupTableEnv();
+ prepareData(tableEnv);
+
+ Configuration conf = getDefaultConfiguration();
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf);
+ Option<String> clusteringInstantOption =
writeClient.scheduleClustering(Option.empty());
+ Assertions.assertFalse(clusteringInstantOption.isPresent());
+ }
+
+ private HoodieClusteringPlan getLatestClusteringPlan(HoodieFlinkWriteClient
writeClient) {
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+ table.getMetaClient().reloadActiveTimeline();
+ Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption =
ClusteringUtils.getClusteringPlan(
+ table.getMetaClient(),
table.getMetaClient().getActiveTimeline().filterPendingReplaceTimeline().lastInstant().get());
+ return clusteringPlanOption.get().getRight();
+ }
+
+ private void prepareData(TableEnvironment tableEnv) throws Exception {
+ // Insert initial data
+ Map<String, String> options = getDefaultConsistentHashingOption();
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options, false, "");
+ tableEnv.executeSql(hoodieTableDDL);
+ tableEnv.executeSql(TestSQL.INSERT_T1).await();
+ TimeUnit.SECONDS.sleep(3);
+
+ // Validate the insertion
+ TestData.checkWrittenData(tempFile, EXPECTED_AFTER_INITIAL_INSERT, 0);
+ }
+
+ private TableEnvironment setupTableEnv() {
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+ return tableEnv;
+ }
+
+ private Configuration getDefaultConfiguration() throws Exception {
+ FlinkClusteringConfig cfg = new FlinkClusteringConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
+
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+ conf.setString(FlinkOptions.RECORD_KEY_FIELD,
metaClient.getTableConfig().getRecordKeyFieldProp());
+ conf.setString(FlinkOptions.PARTITION_PATH_FIELD,
metaClient.getTableConfig().getPartitionFieldProp());
+ for (Map.Entry<String, String> e :
getDefaultConsistentHashingOption().entrySet()) {
+ conf.setString(e.getKey(), e.getValue());
+ }
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ return conf;
+ }
+
+ private Map<String, String> getDefaultConsistentHashingOption() {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.TABLE_TYPE.key(),
HoodieTableType.MERGE_ON_READ.name());
+ options.put(FlinkOptions.OPERATION.key(),
WriteOperationType.UPSERT.name());
+ options.put(FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.BUCKET.name());
+ options.put(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE.key(),
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING.name());
+ options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4");
+ options.put(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.key(),
FlinkConsistentBucketClusteringPlanStrategy.class.getName());
+ // Flink currently only support schedule, and the clustering execution
have to be done by Spark engine.
+ options.put(HoodieClusteringConfig.EXECUTION_STRATEGY_CLASS_NAME.key(),
"org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy");
+
+ // Disable compaction/clustering by default
+ options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+ options.put(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED.key(), "false");
+
+ return options;
+ }
+}