This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e8b1ddd708b [HUDI-6329] Adjust the partitioner automatically for flink
consistent hashing index (#9087)
e8b1ddd708b is described below
commit e8b1ddd708bc2ba99144f92d7533c7200f12509f
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Jul 5 11:09:25 2023 +0800
[HUDI-6329] Adjust the partitioner automatically for flink consistent
hashing index (#9087)
* partitioner would detect new completed resize plan in #snapshotState
* disable scheduling resize plan for insert write pipelines with consistent
bucket index
---
...sistentHashingBucketClusteringPlanStrategy.java | 4 +-
.../action/cluster/strategy/UpdateStrategy.java | 4 +-
.../util/ConsistentHashingUpdateStrategyUtils.java | 107 +++++++++++++++
...arkConsistentBucketDuplicateUpdateStrategy.java | 71 +---------
.../apache/hudi/configuration/OptionsResolver.java | 26 +++-
.../org/apache/hudi/sink/StreamWriteFunction.java | 34 +++--
.../sink/bucket/BucketStreamWriteFunction.java | 2 +-
.../sink/bucket/BucketStreamWriteOperator.java | 5 +-
.../bucket/ConsistentBucketAssignFunction.java | 30 ++++-
.../ConsistentBucketStreamWriteFunction.java | 83 ++++++++++++
.../FlinkConsistentBucketUpdateStrategy.java | 150 +++++++++++++++++++++
.../java/org/apache/hudi/sink/utils/Pipelines.java | 2 +-
.../java/org/apache/hudi/util/ClusteringUtil.java | 5 +-
.../org/apache/hudi/util/FlinkWriteClients.java | 8 +-
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 40 ++++++
.../bucket/ITTestConsistentBucketStreamWrite.java | 23 +++-
.../utils/BucketStreamWriteFunctionWrapper.java | 18 ++-
...ConsistentBucketStreamWriteFunctionWrapper.java | 81 +++++++++++
.../apache/hudi/sink/utils/ScalaCollector.java} | 32 +++--
.../sink/utils/StreamWriteFunctionWrapper.java | 22 ---
.../test/java/org/apache/hudi/utils/TestData.java | 7 +-
21 files changed, 611 insertions(+), 143 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
index 59f9fcb81d1..49ab5f181ad 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
@@ -85,7 +85,7 @@ public abstract class
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
HoodieTimeline timeline =
getHoodieTable().getActiveTimeline().getDeltaCommitTimeline().filterInflightsAndRequested();
if (!timeline.empty()) {
LOG.warn("When using consistent bucket, clustering cannot be scheduled
async if there are concurrent writers. "
- + "Writer instant: " + timeline.getInstants());
+ + "Writer instant: {}.", timeline.getInstants());
return false;
}
return true;
@@ -161,7 +161,7 @@ public abstract class
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
TableFileSystemView fileSystemView = getHoodieTable().getFileSystemView();
boolean isPartitionInClustering =
fileSystemView.getFileGroupsInPendingClustering().anyMatch(p ->
p.getLeft().getPartitionPath().equals(partition));
if (isPartitionInClustering) {
- LOG.info("Partition: " + partition + " is already in clustering, skip");
+ LOG.info("Partition {} is already in clustering, skip.", partition);
return Stream.empty();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
index 4463f7887bb..1c61db4b572 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
@@ -32,8 +32,8 @@ import java.util.Set;
public abstract class UpdateStrategy<T, I> implements Serializable {
protected final transient HoodieEngineContext engineContext;
- protected final HoodieTable table;
- protected final Set<HoodieFileGroupId> fileGroupsInPendingClustering;
+ protected HoodieTable table;
+ protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
public UpdateStrategy(HoodieEngineContext engineContext, HoodieTable table,
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
this.engineContext = engineContext;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/util/ConsistentHashingUpdateStrategyUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/util/ConsistentHashingUpdateStrategyUtils.java
new file mode 100644
index 00000000000..f8351d2fa93
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/util/ConsistentHashingUpdateStrategyUtils.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.util;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
+import org.apache.hudi.table.HoodieTable;
+import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Utility class for update strategy of table with consistent hash bucket
index.
+ */
+public class ConsistentHashingUpdateStrategyUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConsistentHashingUpdateStrategyUtils.class);
+
+ /**
+ * Construct identifier for the given partitions that are under concurrent
resizing (i.e., clustering).
+ * @return map from partition to pair<instant, identifier>, where instant is
the clustering instant.
+ */
+ public static Map<String, Pair<String, ConsistentBucketIdentifier>>
constructPartitionToIdentifier(Set<String> partitions, HoodieTable table) {
+ // Read all pending/ongoing clustering plans
+ List<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPairs =
+
table.getMetaClient().getActiveTimeline().filterInflightsAndRequested().filter(instant
->
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).getInstantsAsStream()
+ .map(instant ->
ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant))
+ .flatMap(o -> o.isPresent() ? Stream.of(o.get()) : Stream.empty())
+ .collect(Collectors.toList());
+
+ // Construct child node for each partition & build the bucket identifier
+ Map<String, HoodieConsistentHashingMetadata> partitionToHashingMeta = new
HashMap<>();
+ Map<String, String> partitionToInstant = new HashMap<>();
+ for (Pair<HoodieInstant, HoodieClusteringPlan> pair : instantPlanPairs) {
+ String instant = pair.getLeft().getTimestamp();
+ HoodieClusteringPlan plan = pair.getRight();
+ extractHashingMetadataFromClusteringPlan(instant, plan, table,
partitions, partitionToHashingMeta, partitionToInstant);
+ }
+ return partitionToHashingMeta.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e ->
Pair.of(partitionToInstant.get(e.getKey()), new
ConsistentBucketIdentifier(e.getValue()))));
+ }
+
+ private static void extractHashingMetadataFromClusteringPlan(String instant,
HoodieClusteringPlan plan, HoodieTable table,
+ final Set<String> recordPartitions, Map<String,
HoodieConsistentHashingMetadata> partitionToHashingMeta, Map<String, String>
partitionToInstant) {
+ for (HoodieClusteringGroup group : plan.getInputGroups()) {
+ Map<String, String> groupMeta = group.getExtraMetadata();
+ String p =
groupMeta.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
+ ValidationUtils.checkState(p != null, "Clustering plan does not has
partition info, plan: " + plan);
+ // Skip unrelated clustering group
+ if (!recordPartitions.contains(p)) {
+ return;
+ }
+
+ String preInstant = partitionToInstant.putIfAbsent(p, instant);
+ ValidationUtils.checkState(preInstant == null ||
preInstant.equals(instant), "Find a partition: " + p + " with two clustering
instants");
+ if (!partitionToHashingMeta.containsKey(p)) {
+ Option<HoodieConsistentHashingMetadata> metadataOption =
ConsistentBucketIndexUtils.loadMetadata(table, p);
+ ValidationUtils.checkState(metadataOption.isPresent(), "Failed to load
consistent hashing metadata for partition: " + p);
+ partitionToHashingMeta.put(p, metadataOption.get());
+ }
+
+ try {
+ String nodeJson =
group.getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
+ List<ConsistentHashingNode> nodes =
ConsistentHashingNode.fromJsonString(nodeJson);
+ partitionToHashingMeta.get(p).getChildrenNodes().addAll(nodes);
+ } catch (Exception e) {
+ LOG.error("Failed to parse child nodes in clustering plan.", e);
+ throw new HoodieException("Failed to parse child nodes in clustering
plan, partition: " + p + ", cluster group: " + group, e);
+ }
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 99af0a14819..3aecda8664a 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -18,42 +18,28 @@
package org.apache.hudi.client.clustering.update.strategy;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieAvroRecord;
-import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
-import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.table.HoodieTable;
-import
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucketClusteringPlanStrategy;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import
org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
@@ -65,8 +51,6 @@ import static
org.apache.hudi.index.HoodieIndexUtils.tagAsNewRecordIfNeeded;
*/
public class SparkConsistentBucketDuplicateUpdateStrategy<T extends
HoodieRecordPayload<T>> extends UpdateStrategy<T, HoodieData<HoodieRecord<T>>> {
- private static final Logger LOG =
LoggerFactory.getLogger(SparkConsistentBucketDuplicateUpdateStrategy.class);
-
public SparkConsistentBucketDuplicateUpdateStrategy(HoodieEngineContext
engineContext, HoodieTable table, Set<HoodieFileGroupId>
fileGroupsInPendingClustering) {
super(engineContext, table, fileGroupsInPendingClustering);
}
@@ -86,64 +70,21 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T
extends HoodieRecord
return Pair.of(taggedRecordsRDD, Collections.emptySet());
}
- // Read all pending/ongoing clustering plans
- List<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPairs =
-
table.getMetaClient().getActiveTimeline().filterInflightsAndRequested().filter(instant
->
instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).getInstantsAsStream()
- .map(instant ->
ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant))
- .flatMap(o -> o.isPresent() ? Stream.of(o.get()) : Stream.empty())
- .collect(Collectors.toList());
-
// Construct child node for each partition & build the bucket identifier
final Set<String> partitions = new
HashSet<>(filteredRecordsRDD.map(HoodieRecord::getPartitionPath).distinct().collectAsList());
- Map<String, HoodieConsistentHashingMetadata> partitionToHashingMeta = new
HashMap<>();
- Map<String, String> partitionToInstant = new HashMap<>();
- for (Pair<HoodieInstant, HoodieClusteringPlan> pair : instantPlanPairs) {
- String instant = pair.getLeft().getTimestamp();
- HoodieClusteringPlan plan = pair.getRight();
- extractHashingMetadataFromClusteringPlan(instant, plan, partitions,
partitionToHashingMeta, partitionToInstant);
- }
- Map<String, ConsistentBucketIdentifier> partitionToIdentifier =
partitionToHashingMeta.entrySet().stream()
- .collect(Collectors.toMap(Map.Entry::getKey, e -> new
ConsistentBucketIdentifier(e.getValue())));
+ Map<String, Pair<String, ConsistentBucketIdentifier>>
partitionToIdentifier =
+
ConsistentHashingUpdateStrategyUtils.constructPartitionToIdentifier(partitions,
table);
// Produce records tagged with new record location
List<String> indexKeyFields =
Arrays.asList(table.getConfig().getBucketIndexHashField().split(","));
HoodieData<HoodieRecord<T>> redirectedRecordsRDD =
filteredRecordsRDD.map(r -> {
- ConsistentHashingNode node =
partitionToIdentifier.get(r.getPartitionPath()).getBucket(r.getKey(),
indexKeyFields);
+ Pair<String, ConsistentBucketIdentifier> identifierPair =
partitionToIdentifier.get(r.getPartitionPath());
+ ConsistentHashingNode node =
identifierPair.getValue().getBucket(r.getKey(), indexKeyFields);
return tagAsNewRecordIfNeeded(new HoodieAvroRecord(r.getKey(),
r.getData(), r.getOperation()),
- Option.of(new
HoodieRecordLocation(partitionToInstant.get(r.getPartitionPath()),
FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
+ Option.ofNullable(new HoodieRecordLocation(identifierPair.getKey(),
FSUtils.createNewFileId(node.getFileIdPrefix(), 0))));
});
// Return combined iterator (the original and records with new location)
return Pair.of(taggedRecordsRDD.union(redirectedRecordsRDD),
Collections.emptySet());
}
-
- private void extractHashingMetadataFromClusteringPlan(String instant,
HoodieClusteringPlan plan, final Set<String> recordPartitions,
- Map<String,
HoodieConsistentHashingMetadata> partitionToHashingMeta, Map<String, String>
partitionToInstant) {
- for (HoodieClusteringGroup group : plan.getInputGroups()) {
- Map<String, String> groupMeta = group.getExtraMetadata();
- String p =
groupMeta.get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_PARTITION_KEY);
- checkState(p != null, "Clustering plan does not has partition info,
plan: " + plan);
- // Skip unrelated clustering group
- if (!recordPartitions.contains(p)) {
- return;
- }
-
- String preInstant = partitionToInstant.putIfAbsent(p, instant);
- checkState(preInstant == null || preInstant.equals(instant), "Find a
partition: " + p + " with two clustering instants");
- if (!partitionToHashingMeta.containsKey(p)) {
- Option<HoodieConsistentHashingMetadata> metadataOption =
ConsistentBucketIndexUtils.loadMetadata(table, p);
- checkState(metadataOption.isPresent(), "Failed to load consistent
hashing metadata for partition: " + p);
- partitionToHashingMeta.put(p, metadataOption.get());
- }
-
- try {
- String nodeJson =
group.getExtraMetadata().get(BaseConsistentHashingBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
- List<ConsistentHashingNode> nodes =
ConsistentHashingNode.fromJsonString(nodeJson);
- partitionToHashingMeta.get(p).getChildrenNodes().addAll(nodes);
- } catch (Exception e) {
- LOG.error("Failed to parse child nodes in clustering plan", e);
- throw new HoodieException("Failed to parse child nodes in clustering
plan, partition: " + p + ", cluster group: " + group, e);
- }
- }
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index e7a6901b0b2..b68dcbd0698 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -18,6 +18,7 @@
package org.apache.hudi.configuration;
+import
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import
org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
@@ -140,10 +141,21 @@ public class OptionsResolver {
return HoodieIndex.BucketIndexEngineType.valueOf(bucketEngineType);
}
+ /**
+ * Returns whether the table index is consistent bucket index.
+ */
public static boolean isConsistentHashingBucketIndexType(Configuration conf)
{
return isBucketIndexType(conf) &&
getBucketEngineType(conf).equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING);
}
+ /**
+ * Returns the default plan strategy class.
+ */
+ public static String getDefaultPlanStrategyClassName(Configuration conf) {
+ return OptionsResolver.isConsistentHashingBucketIndexType(conf) ?
FlinkConsistentBucketClusteringPlanStrategy.class.getName() :
+ FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.defaultValue();
+ }
+
/**
* Returns whether the source should emit changelog.
*
@@ -190,7 +202,19 @@ public class OptionsResolver {
* @param conf The flink configuration.
*/
public static boolean needsScheduleClustering(Configuration conf) {
- return isInsertOperation(conf) &&
conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED);
+ if (!conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) {
+ return false;
+ }
+ WriteOperationType operationType =
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
+ if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
+ // Write pipelines for table with consistent bucket index would detect
whether clustering service occurs,
+ // and automatically adjust the partitioner and write function if
clustering service happens.
+ // So it could handle UPSERT.
+ // But it could not handle INSERT case, because insert write would not
take index into consideration currently.
+ return operationType == WriteOperationType.UPSERT;
+ } else {
+ return operationType == WriteOperationType.INSERT;
+ }
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 5087c814030..24a594d0266 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -103,7 +103,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
*/
private transient Map<String, DataBucket> buckets;
- private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>>
writeFunction;
+ protected transient BiFunction<List<HoodieRecord>, String,
List<WriteStatus>> writeFunction;
private transient HoodieRecordMerger recordMerger;
@@ -246,7 +246,7 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
/**
* Data bucket.
*/
- private static class DataBucket {
+ protected static class DataBucket {
private final List<DataItem> records;
private final BufferSizeDetector detector;
private final String partitionPath;
@@ -430,12 +430,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
List<HoodieRecord> records = bucket.writeBuffer();
ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has
no buffering records");
- if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
- records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
- .deduplicateRecords(records, null, -1,
this.writeClient.getConfig().getSchema(),
this.writeClient.getConfig().getProps(), recordMerger);
- }
- bucket.preWrite(records);
- final List<WriteStatus> writeStatus = new
ArrayList<>(writeFunction.apply(records, instant));
+ records = deduplicateRecordsIfNeeded(records);
+ final List<WriteStatus> writeStatus = writeBucket(instant, bucket,
records);
records.clear();
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
@@ -466,12 +462,8 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
.forEach(bucket -> {
List<HoodieRecord> records = bucket.writeBuffer();
if (records.size() > 0) {
- if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
- records = (List<HoodieRecord>) FlinkWriteHelper.newInstance()
- .deduplicateRecords(records, null, -1,
this.writeClient.getConfig().getSchema(),
this.writeClient.getConfig().getProps(), recordMerger);
- }
- bucket.preWrite(records);
- writeStatus.addAll(writeFunction.apply(records, currentInstant));
+ records = deduplicateRecordsIfNeeded(records);
+ writeStatus.addAll(writeBucket(currentInstant, bucket, records));
records.clear();
bucket.reset();
}
@@ -496,4 +488,18 @@ public class StreamWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
// blocks flushing until the coordinator starts a new instant
this.confirming = true;
}
+
+ protected List<WriteStatus> writeBucket(String instant, DataBucket bucket,
List<HoodieRecord> records) {
+ bucket.preWrite(records);
+ return writeFunction.apply(records, instant);
+ }
+
+ private List<HoodieRecord> deduplicateRecordsIfNeeded(List<HoodieRecord>
records) {
+ if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
+ return FlinkWriteHelper.newInstance()
+ .deduplicateRecords(records, null, -1,
this.writeClient.getConfig().getSchema(),
this.writeClient.getConfig().getProps(), recordMerger);
+ } else {
+ return records;
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 5a7889d327e..63b9c4b3742 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -40,7 +40,7 @@ import java.util.Map;
import java.util.Set;
/**
- * A stream write function with bucket hash index.
+ * A stream write function with simple bucket hash index.
*
* <p>The task holds a fresh new local index: {(partition + bucket number)
&rarr fileId} mapping, this index
* is used for deciding whether the incoming records in an UPDATE or INSERT.
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
index a48ea44ddc4..3c836bdf8ad 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
@@ -18,6 +18,7 @@
package org.apache.hudi.sink.bucket;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.common.AbstractWriteOperator;
import org.apache.hudi.sink.common.WriteOperatorFactory;
@@ -31,7 +32,9 @@ import org.apache.flink.configuration.Configuration;
public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
public BucketStreamWriteOperator(Configuration conf) {
- super(new BucketStreamWriteFunction<>(conf));
+ super(OptionsResolver.isConsistentHashingBucketIndexType(conf)
+ ? new ConsistentBucketStreamWriteFunction<>(conf)
+ : new BucketStreamWriteFunction<>(conf));
}
public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
index 5fa4c48ae19..f5dd3def770 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
@@ -25,6 +25,9 @@ import
org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
@@ -35,6 +38,9 @@ import
org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -45,12 +51,13 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* The function to tag each incoming record with a location of a file based on
consistent bucket index.
*/
-public class ConsistentBucketAssignFunction extends
ProcessFunction<HoodieRecord, HoodieRecord> {
+public class ConsistentBucketAssignFunction extends
ProcessFunction<HoodieRecord, HoodieRecord> implements CheckpointedFunction {
private static final Logger LOG =
LoggerFactory.getLogger(ConsistentBucketAssignFunction.class);
@@ -59,6 +66,7 @@ public class ConsistentBucketAssignFunction extends
ProcessFunction<HoodieRecord
private final int bucketNum;
private transient HoodieFlinkWriteClient writeClient;
private transient Map<String, ConsistentBucketIdentifier>
partitionToIdentifier;
+ private transient String lastRefreshInstant = HoodieTimeline.INIT_INSTANT_TS;
private final int maxRetries = 10;
private final long maxWaitTimeInMs = 1000;
@@ -124,4 +132,24 @@ public class ConsistentBucketAssignFunction extends
ProcessFunction<HoodieRecord
return new ConsistentBucketIdentifier(metadata);
});
}
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
+ HoodieTimeline timeline =
writeClient.getHoodieTable().getActiveTimeline().getCompletedReplaceTimeline().findInstantsAfter(lastRefreshInstant);
+ if (!timeline.empty()) {
+ for (HoodieInstant instant : timeline.getInstants()) {
+ HoodieReplaceCommitMetadata commitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
+ timeline.getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class);
+ Set<String> affectedPartitions =
commitMetadata.getPartitionToReplaceFileIds().keySet();
+ LOG.info("Clear up cached hashing metadata because find a new replace
commit.\n Instant: {}.\n Effected Partitions: {}.", lastRefreshInstant,
affectedPartitions);
+ affectedPartitions.forEach(this.partitionToIdentifier::remove);
+ }
+ this.lastRefreshInstant = timeline.lastInstant().get().getTimestamp();
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {
+ // no operation
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
new file mode 100644
index 00000000000..9372d70b68e
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketStreamWriteFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.StreamWriteFunction;
+import
org.apache.hudi.sink.clustering.update.strategy.FlinkConsistentBucketUpdateStrategy;
+
+import org.apache.flink.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A stream write function with consistent bucket hash index.
+ *
+ * @param <I> the input type
+ */
+public class ConsistentBucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConsistentBucketStreamWriteFunction.class);
+
+ private transient FlinkConsistentBucketUpdateStrategy updateStrategy;
+
+ /**
+ * Constructs a ConsistentBucketStreamWriteFunction.
+ *
+ * @param config The config options
+ */
+ public ConsistentBucketStreamWriteFunction(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws IOException {
+ super.open(parameters);
+ List<String> indexKeyFields =
Arrays.asList(config.getString(FlinkOptions.INDEX_KEY_FIELD).split(","));
+ this.updateStrategy = new
FlinkConsistentBucketUpdateStrategy(this.writeClient, indexKeyFields);
+ }
+
+ @Override
+ public void snapshotState() {
+ super.snapshotState();
+ updateStrategy.reset();
+ }
+
+ @Override
+ protected List<WriteStatus> writeBucket(String instant, DataBucket bucket,
List<HoodieRecord> records) {
+ updateStrategy.initialize(this.writeClient);
+ bucket.preWrite(records);
+ Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>>
recordListFgPair =
+ updateStrategy.handleUpdate(Collections.singletonList(Pair.of(records,
instant)));
+ return recordListFgPair.getKey().stream().flatMap(
+ recordsInstantPair ->
writeFunction.apply(recordsInstantPair.getLeft(),
recordsInstantPair.getRight()).stream()
+ ).collect(Collectors.toList());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/update/strategy/FlinkConsistentBucketUpdateStrategy.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/update/strategy/FlinkConsistentBucketUpdateStrategy.java
new file mode 100644
index 00000000000..f9a58a7318d
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/update/strategy/FlinkConsistentBucketUpdateStrategy.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.clustering.update.strategy;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
+import
org.apache.hudi.table.action.cluster.util.ConsistentHashingUpdateStrategyUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Update strategy for consistent hashing bucket index. If updates to file
groups that are under clustering are identified,
+ * then the current batch of records will route to both old and new file groups
+ * (i.e., dual write).
+ */
+public class FlinkConsistentBucketUpdateStrategy<T extends
HoodieRecordPayload> extends UpdateStrategy<T, List<Pair<List<HoodieRecord>,
String>>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkConsistentBucketUpdateStrategy.class);
+
+ private boolean initialized = false;
+ private List<String> indexKeyFields;
+ private Map<String, Pair<String, ConsistentBucketIdentifier>>
partitionToIdentifier;
+ private String lastRefreshInstant = HoodieTimeline.INIT_INSTANT_TS;
+
+ public FlinkConsistentBucketUpdateStrategy(HoodieFlinkWriteClient
writeClient, List<String> indexKeyFields) {
+ super(writeClient.getEngineContext(), writeClient.getHoodieTable(),
Collections.emptySet());
+ this.indexKeyFields = indexKeyFields;
+ this.partitionToIdentifier = new HashMap<>();
+ }
+
+ public void initialize(HoodieFlinkWriteClient writeClient) {
+ if (initialized) {
+ return;
+ }
+ HoodieFlinkTable table = writeClient.getHoodieTable();
+ List<HoodieInstant> instants =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
+ if (!instants.isEmpty()) {
+ HoodieInstant latestPendingReplaceInstant = instants.get(instants.size()
- 1);
+ if
(latestPendingReplaceInstant.getTimestamp().compareTo(lastRefreshInstant) > 0) {
+ LOG.info("Found new pending replacement commit. Last pending
replacement commit is {}.", latestPendingReplaceInstant);
+ this.table = table;
+ this.fileGroupsInPendingClustering =
table.getFileSystemView().getFileGroupsInPendingClustering()
+ .map(Pair::getKey).collect(Collectors.toSet());
+ // TODO throw exception if exists bucket merge plan
+ this.lastRefreshInstant = latestPendingReplaceInstant.getTimestamp();
+ this.partitionToIdentifier.clear();
+ }
+ }
+ this.initialized = true;
+ }
+
+ public void reset() {
+ initialized = false;
+ }
+
+ @Override
+ public Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>>
handleUpdate(List<Pair<List<HoodieRecord>, String>> recordsList) {
+ ValidationUtils.checkArgument(initialized, "Strategy has not been
initialized");
+ ValidationUtils.checkArgument(recordsList.size() == 1);
+
+ Pair<List<HoodieRecord>, String> recordsInstantPair = recordsList.get(0);
+ HoodieRecord sampleRecord = recordsInstantPair.getLeft().get(0);
+ HoodieFileGroupId fileId = new
HoodieFileGroupId(sampleRecord.getPartitionPath(),
sampleRecord.getCurrentLocation().getFileId());
+ if (fileGroupsInPendingClustering.isEmpty() ||
!fileGroupsInPendingClustering.contains(fileId)) {
+ return Pair.of(recordsList, Collections.singleton(fileId));
+ }
+
+ return doHandleUpdate(fileId, recordsInstantPair);
+ }
+
+ private Pair<List<Pair<List<HoodieRecord>, String>>, Set<HoodieFileGroupId>>
doHandleUpdate(HoodieFileGroupId fileId, Pair<List<HoodieRecord>, String>
recordsInstantPair) {
+ Pair<String, ConsistentBucketIdentifier> bucketIdentifierPair =
getBucketIdentifierOfPartition(fileId.getPartitionPath());
+ String clusteringInstant = bucketIdentifierPair.getLeft();
+ ConsistentBucketIdentifier identifier = bucketIdentifierPair.getRight();
+
+ // Construct records list routing to new file groups according the new
bucket identifier
+ Map<String, List<HoodieRecord>> fileIdToRecords =
recordsInstantPair.getLeft().stream().map(HoodieRecord::newInstance)
+ .collect(Collectors.groupingBy(r -> identifier.getBucket(r.getKey(),
indexKeyFields).getFileIdPrefix()));
+
+ // Tag first record with the corresponding fileId & clusteringInstantTime
+ List<Pair<List<HoodieRecord>, String>> recordsList = new ArrayList<>();
+ Set<HoodieFileGroupId> fgs = new LinkedHashSet<>();
+ for (Map.Entry<String, List<HoodieRecord>> e : fileIdToRecords.entrySet())
{
+ String newFileId = FSUtils.createNewFileId(e.getKey(), 0);
+ patchFileIdToRecords(e.getValue(), newFileId);
+ recordsList.add(Pair.of(e.getValue(), clusteringInstant));
+ fgs.add(new HoodieFileGroupId(fileId.getPartitionPath(), newFileId));
+ }
+ LOG.info("Apply duplicate update for FileGroup {}, routing records to:
{}.", fileId, String.join(",", fileIdToRecords.keySet()));
+ // TODO add option to skip dual update, i.e., write updates only to the
new file group
+ recordsList.add(recordsInstantPair);
+ fgs.add(fileId);
+ return Pair.of(recordsList, fgs);
+ }
+
+ private Pair<String, ConsistentBucketIdentifier>
getBucketIdentifierOfPartition(String partition) {
+ return partitionToIdentifier.computeIfAbsent(partition, p ->
ConsistentHashingUpdateStrategyUtils.constructPartitionToIdentifier(Collections.singleton(p),
table).get(p)
+ );
+ }
+
+ /**
+ * Rewrite the first record with given fileID
+ */
+ private void patchFileIdToRecords(List<HoodieRecord> records, String fileId)
{
+ HoodieRecord first = records.get(0);
+ HoodieRecord record = new HoodieAvroRecord<>(first.getKey(),
(HoodieRecordPayload) first.getData(), first.getOperation());
+ HoodieRecordLocation newLoc = new HoodieRecordLocation("U", fileId);
+ record.setCurrentLocation(newLoc);
+ records.set(0, record);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 900ccc77670..5d945d07aa1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -366,7 +366,7 @@ public class Pipelines {
.transform(
opName("consistent_bucket_write", conf),
TypeInformation.of(Object.class),
- StreamWriteOperator.getFactory(conf))
+ BucketStreamWriteOperator.getFactory(conf))
.uid(opUID("consistent_bucket_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
default:
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 2d47bb8a1b4..75d4ea79815 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -53,9 +53,10 @@ public class ClusteringUtil {
HoodieIndex.BucketIndexEngineType bucketIndexEngineType =
OptionsResolver.getBucketEngineType(conf);
switch (bucketIndexEngineType) {
case SIMPLE:
- throw new UnsupportedOperationException("Clustering is not supported
for simple bucket index.");
+ throw new HoodieNotSupportedException("Clustering is not supported
for simple bucket index.");
case CONSISTENT_HASHING:
- if
(!conf.get(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS).equalsIgnoreCase(FlinkConsistentBucketClusteringPlanStrategy.class.getName()))
{
+ String clusteringPlanStrategyClass =
conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
OptionsResolver.getDefaultPlanStrategyClassName(conf));
+ if
(!clusteringPlanStrategyClass.equalsIgnoreCase(FlinkConsistentBucketClusteringPlanStrategy.class.getName()))
{
throw new HoodieNotSupportedException(
"CLUSTERING_PLAN_STRATEGY_CLASS should be set to " +
FlinkConsistentBucketClusteringPlanStrategy.class.getName() + " in order to
work with Consistent Hashing Bucket Index.");
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index 1835e9c894b..960b85d95ab 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -40,7 +40,6 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
-import
org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.flink.api.common.functions.RuntimeContext;
@@ -164,7 +163,7 @@ public class FlinkWriteClients {
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
.withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED))
-
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
getDefaultPlanStrategyClassName(conf)))
+
.withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS,
OptionsResolver.getDefaultPlanStrategyClassName(conf)))
.withClusteringPlanPartitionFilterMode(
ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME)))
.withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS))
@@ -246,9 +245,4 @@ public class FlinkWriteClients {
}
return writeConfig;
}
-
- private static String getDefaultPlanStrategyClassName(Configuration conf) {
- return OptionsResolver.isConsistentHashingBucketIndexType(conf) ?
FlinkConsistentBucketClusteringPlanStrategy.class.getName() :
- FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS.defaultValue();
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 4683955d8e0..d968362ca2a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -20,6 +20,8 @@ package org.apache.hudi.sink;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.utils.TestData;
@@ -156,6 +158,44 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
// insert async clustering is only valid for cow table.
}
+ @Test
+ public void testConsistentBucketIndex() throws Exception {
+ conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+ conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE,
"CONSISTENT_HASHING");
+ conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+ conf.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
+ // Enable inline resize scheduling
+ conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+ // Manually set the max commits to trigger clustering quickly
+ conf.setString(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(),
"1");
+ // Manually set the split threshold to trigger split in the clustering
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ conf.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(),
String.valueOf(1 / 1024.0 / 1024.0));
+ conf.set(FlinkOptions.PRE_COMBINE, true);
+ HashMap<String, String> mergedExpected = new HashMap<>(EXPECTED1);
+ mergedExpected.put("par1", "[id1,par1,id1,Danny,22,4,par1,
id2,par1,id2,Stephen,33,2,par1]");
+ TestHarness.instance().preparePipeline(tempFile, conf)
+ .consume(TestData.DATA_SET_INSERT)
+ .emptyEventBuffer()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(EXPECTED1, 4)
+ .consume(TestData.DATA_SET_DISORDER_INSERT)
+ .emptyEventBuffer()
+ .checkpoint(2)
+ .assertNextEvent()
+ .checkpointComplete(2)
+ .checkWrittenData(mergedExpected, 4)
+ .consume(TestData.DATA_SET_SINGLE_INSERT)
+ .emptyEventBuffer()
+ .checkpoint(3)
+ .assertNextEvent()
+ .checkpointComplete(3)
+ .checkWrittenData(mergedExpected, 4)
+ .end();
+ }
+
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
index 2eb6e4b3626..4882552f0a7 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
@@ -22,6 +22,8 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsInference;
import org.apache.hudi.exception.HoodieException;
@@ -86,18 +88,34 @@ public class ITTestConsistentBucketStreamWrite extends
TestLogger {
conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE,
"CONSISTENT_HASHING");
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
- conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
testWriteToHoodie(conf, "mor_write", 1, EXPECTED);
}
+ @Test
+ public void testWriteMORWithResizePlan() throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+ conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE,
"CONSISTENT_HASHING");
+ conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+ conf.setString(HoodieIndexConfig.BUCKET_INDEX_MAX_NUM_BUCKETS.key(), "8");
+ conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
+ // Enable inline resize scheduling
+ conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
+ // Manually set the max commits to trigger clustering quickly
+ conf.setString(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key(),
"1");
+ // Manually set the split threshold to trigger split in the clustering
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+ conf.setString(HoodieIndexConfig.BUCKET_SPLIT_THRESHOLD.key(),
String.valueOf(1 / 1024.0 / 1024.0));
+ testWriteToHoodie(conf, "mor_write", 1, EXPECTED);
+ }
+
@Test
public void testBulkInsert() {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE,
"CONSISTENT_HASHING");
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
- conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
conf.setString(FlinkOptions.OPERATION, "bulk_insert");
@@ -113,7 +131,6 @@ public class ITTestConsistentBucketStreamWrite extends
TestLogger {
conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE,
"CONSISTENT_HASHING");
conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
- conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
conf.setString(FlinkOptions.OPERATION, "INSERT_OVERWRITE");
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index aa318efa8da..df648df9ac0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -59,24 +59,24 @@ import java.util.concurrent.CompletableFuture;
* @param <I> Input type
*/
public class BucketStreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
- private final Configuration conf;
+ protected final Configuration conf;
private final IOManager ioManager;
- private final StreamingRuntimeContext runtimeContext;
+ protected final StreamingRuntimeContext runtimeContext;
private final MockOperatorEventGateway gateway;
private final MockOperatorCoordinatorContext coordinatorContext;
- private final StreamWriteOperatorCoordinator coordinator;
- private final MockStateInitializationContext stateInitializationContext;
+ protected final StreamWriteOperatorCoordinator coordinator;
+ protected final MockStateInitializationContext stateInitializationContext;
/**
* Function that converts row data to HoodieRecord.
*/
- private RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
+ protected RowDataToHoodieFunction<RowData, HoodieRecord<?>> toHoodieFunction;
/**
* Stream write function.
*/
- private StreamWriteFunction<HoodieRecord<?>> writeFunction;
+ protected StreamWriteFunction<HoodieRecord<?>> writeFunction;
private CompactFunctionWrapper compactFunctionWrapper;
@@ -195,7 +195,7 @@ public class BucketStreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<
// -------------------------------------------------------------------------
private void setupWriteFunction() throws Exception {
- writeFunction = new BucketStreamWriteFunction<>(conf);
+ writeFunction = createWriteFunction();
writeFunction.setRuntimeContext(runtimeContext);
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.stateInitializationContext);
@@ -204,4 +204,8 @@ public class BucketStreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<
// handle the bootstrap event
coordinator.handleEventFromOperator(0, getNextEvent());
}
+
+ protected StreamWriteFunction<HoodieRecord<?>> createWriteFunction() {
+ return new BucketStreamWriteFunction<>(conf);
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ConsistentBucketStreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ConsistentBucketStreamWriteFunctionWrapper.java
new file mode 100644
index 00000000000..02591bbb412
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ConsistentBucketStreamWriteFunctionWrapper.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.utils;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.sink.StreamWriteFunction;
+import org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction;
+import org.apache.hudi.sink.bucket.ConsistentBucketStreamWriteFunction;
+import org.apache.hudi.utils.TestConfigurations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import
org.apache.flink.streaming.api.operators.collect.utils.MockFunctionSnapshotContext;
+import org.apache.flink.table.data.RowData;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class to manipulate the {@link
ConsistentBucketStreamWriteFunction} instance for testing.
+ *
+ * @param <I> Input type
+ */
+public class ConsistentBucketStreamWriteFunctionWrapper<I> extends
BucketStreamWriteFunctionWrapper<I> {
+
+ private ConsistentBucketAssignFunction assignFunction;
+
+ public ConsistentBucketStreamWriteFunctionWrapper(String tablePath) throws
Exception {
+ this(tablePath, TestConfigurations.getDefaultConf(tablePath));
+ }
+
+ public ConsistentBucketStreamWriteFunctionWrapper(String tablePath,
Configuration conf) throws Exception {
+ super(tablePath, conf);
+ }
+
+ @Override
+ public void openFunction() throws Exception {
+ super.openFunction();
+ assignFunction = new ConsistentBucketAssignFunction(conf);
+ assignFunction.setRuntimeContext(runtimeContext);
+ assignFunction.open(conf);
+ }
+
+ @Override
+ public void invoke(I record) throws Exception {
+ HoodieRecord hoodieRecord = toHoodieFunction.map((RowData) record);
+ ScalaCollector<HoodieRecord> collector = ScalaCollector.getInstance();
+ assignFunction.processElement(hoodieRecord, null, collector);
+ writeFunction.processElement(collector.getVal(), null, null);
+ }
+
+ @Override
+ protected StreamWriteFunction<HoodieRecord<?>> createWriteFunction() {
+ return new ConsistentBucketStreamWriteFunction<>(conf);
+ }
+
+ @Override
+ public void checkpointFunction(long checkpointId) throws Exception {
+ // checkpoint the coordinator first
+ FunctionSnapshotContext functionSnapshotContext = new
MockFunctionSnapshotContext(checkpointId);
+ this.coordinator.checkpointCoordinator(checkpointId, new
CompletableFuture<>());
+ writeFunction.snapshotState(functionSnapshotContext);
+ assignFunction.snapshotState(functionSnapshotContext);
+
stateInitializationContext.getOperatorStateStore().checkpointBegin(checkpointId);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ScalaCollector.java
similarity index 56%
copy from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
copy to
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ScalaCollector.java
index a48ea44ddc4..681c6de2a89 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/ScalaCollector.java
@@ -16,25 +16,31 @@
* limitations under the License.
*/
-package org.apache.hudi.sink.bucket;
+package org.apache.hudi.sink.utils;
-import org.apache.hudi.sink.common.AbstractWriteOperator;
-import org.apache.hudi.sink.common.WriteOperatorFactory;
-
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
/**
- * Operator for {@link BucketStreamWriteFunction}.
- *
- * @param <I> The input type
+ * A mock {@link Collector} that used in {@link TestFunctionWrapper}.
*/
-public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
+class ScalaCollector<T> implements Collector<T> {
+ private T val;
+
+ public static <T> ScalaCollector<T> getInstance() {
+ return new ScalaCollector<>();
+ }
+
+ @Override
+ public void collect(T t) {
+ this.val = t;
+ }
- public BucketStreamWriteOperator(Configuration conf) {
- super(new BucketStreamWriteFunction<>(conf));
+ @Override
+ public void close() {
+ this.val = null;
}
- public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
- return WriteOperatorFactory.instance(conf, new
BucketStreamWriteOperator<>(conf));
+ public T getVal() {
+ return val;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 22280bb3129..36b0dc32757 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -290,26 +290,4 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
return this.updateKeys.contains(key);
}
}
-
- private static class ScalaCollector<T> implements Collector<T> {
- private T val;
-
- public static <T> ScalaCollector<T> getInstance() {
- return new ScalaCollector<>();
- }
-
- @Override
- public void collect(T t) {
- this.val = t;
- }
-
- @Override
- public void close() {
- this.val = null;
- }
-
- public T getVal() {
- return val;
- }
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 8c8ecf71061..db9cd65b9f1 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -34,6 +34,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper;
+import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper;
import org.apache.hudi.sink.utils.InsertFunctionWrapper;
import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.sink.utils.TestFunctionWrapper;
@@ -539,7 +540,11 @@ public class TestData {
if (OptionsResolver.isAppendMode(conf)) {
return new InsertFunctionWrapper<>(basePath, conf);
} else if (OptionsResolver.isBucketIndexType(conf)) {
- return new BucketStreamWriteFunctionWrapper<>(basePath, conf);
+ if (OptionsResolver.isConsistentHashingBucketIndexType(conf)) {
+ return new ConsistentBucketStreamWriteFunctionWrapper<>(basePath,
conf);
+ } else {
+ return new BucketStreamWriteFunctionWrapper<>(basePath, conf);
+ }
} else {
return new StreamWriteFunctionWrapper<>(basePath, conf);
}