This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 42e6fff8943 [HUDI-6326] Flink write supports consistent bucket index
(#9012)
42e6fff8943 is described below
commit 42e6fff8943442363815a306767e24689a727406
Author: Jing Zhang <[email protected]>
AuthorDate: Tue Jun 20 11:27:40 2023 +0800
[HUDI-6326] Flink write supports consistent bucket index (#9012)
* Refactor the code of consistent hashing bucket index, extracts common
utility to client common module
Flink write progress support consistent hashing bucket index.
* Introduce a bucket assigner operator, which is used to decide the the
location of the incoming record based on the
consistent bucket index metadata. Make the initial uuid of bucket nodes
determnistic to resolve conflicts by
idempotent file creation from multiple tasks.
It would not cover (would be done in the following subtasks):
* generate resize plan
* resolve the resizing cases during the write process
---
.../apache/hudi/config/HoodieClusteringConfig.java | 4 +
.../index/bucket/ConsistentBucketIndexUtils.java} | 221 ++++------------
.../index/bucket/HoodieConsistentBucketIndex.java | 108 ++++++++
.../apache/hudi/index/FlinkHoodieIndexFactory.java | 10 +-
...parkConsistentBucketClusteringPlanStrategy.java | 5 +-
...arkConsistentBucketDuplicateUpdateStrategy.java | 4 +-
.../RDDConsistentBucketBulkInsertPartitioner.java | 3 +-
.../bucket/HoodieSparkConsistentBucketIndex.java | 288 +--------------------
.../TestSparkConsistentBucketClustering.java | 6 +-
.../model/HoodieConsistentHashingMetadata.java | 32 ++-
.../apache/hudi/configuration/FlinkOptions.java | 7 +
.../apache/hudi/configuration/OptionsResolver.java | 9 +-
.../bucket/ConsistentBucketAssignFunction.java | 127 +++++++++
.../java/org/apache/hudi/sink/utils/Pipelines.java | 53 +++-
.../bucket/ITTestConsistentBucketStreamWrite.java | 192 ++++++++++++++
15 files changed, 586 insertions(+), 483 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index ec61a580fa9..9732c52ac42 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -642,6 +642,10 @@ public class HoodieClusteringConfig extends HoodieConfig {
+ "schedule inline clustering (%s) can be enabled. Both can't be
set to true at the same time. %s,%s",
HoodieClusteringConfig.INLINE_CLUSTERING.key(),
HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(),
inlineCluster, inlineClusterSchedule));
+ if (engineType.equals(EngineType.FLINK)) {
+ // support resize for Flink to unlock the validation.
+ return;
+ }
// Consistent hashing bucket index
if (clusteringConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
&&
clusteringConfig.contains(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
similarity index 51%
copy from
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
copy to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
index ebbecbda241..ec50bfcde5d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/ConsistentBucketIndexUtils.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,28 +18,15 @@
package org.apache.hudi.index.bucket;
-import org.apache.hudi.avro.model.HoodieClusteringGroup;
-import org.apache.hudi.avro.model.HoodieClusteringPlan;
-import org.apache.hudi.client.WriteStatus;
-import
org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
-import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.table.HoodieTable;
@@ -56,123 +43,42 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-import scala.Tuple2;
-
import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX;
import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
/**
- * Consistent hashing bucket index implementation, with auto-adjust bucket
number.
- * NOTE: bucket resizing is triggered by clustering.
+ * Utilities class for consistent bucket index metadata management.
*/
-public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
-
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSparkConsistentBucketIndex.class);
-
- public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
- super(config);
- }
+public class ConsistentBucketIndexUtils {
- @Override
- public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses,
- HoodieEngineContext context,
- HoodieTable hoodieTable)
- throws HoodieIndexException {
- throw new HoodieIndexException("Consistent hashing index does not support
update location without the instant parameter");
- }
+ private static final Logger LOG =
LoggerFactory.getLogger(ConsistentBucketIndexUtils.class);
/**
- * Persist hashing metadata to storage. Only clustering operations will
modify the metadata.
- * For example, splitting & merging buckets, or just sorting and producing a
new bucket.
- */
- @Override
- public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses,
- HoodieEngineContext context,
- HoodieTable hoodieTable,
- String instantTime)
- throws HoodieIndexException {
- HoodieInstant instant =
hoodieTable.getMetaClient().getActiveTimeline().findInstantsAfterOrEquals(instantTime,
1).firstInstant().get();
- ValidationUtils.checkState(instant.getTimestamp().equals(instantTime),
"Cannot get the same instant, instantTime: " + instantTime);
- if (!instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
- return writeStatuses;
- }
-
- // Double-check if it is a clustering operation by trying to obtain the
clustering plan
- Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlanPair =
- ClusteringUtils.getClusteringPlan(hoodieTable.getMetaClient(),
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime));
- if (!instantPlanPair.isPresent()) {
- return writeStatuses;
- }
-
- HoodieClusteringPlan plan = instantPlanPair.get().getRight();
-
HoodieJavaRDD.getJavaRDD(context.parallelize(plan.getInputGroups().stream().map(HoodieClusteringGroup::getExtraMetadata).collect(Collectors.toList())))
- .mapToPair(m -> new
Tuple2<>(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_PARTITION_KEY),
m)
- ).groupByKey().foreach((input) -> {
- // Process each partition
- String partition = input._1();
- List<ConsistentHashingNode> childNodes = new ArrayList<>();
- int seqNo = 0;
- for (Map<String, String> m: input._2()) {
- String nodesJson =
m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_CHILD_NODE_KEY);
- childNodes.addAll(ConsistentHashingNode.fromJsonString(nodesJson));
- seqNo =
Integer.parseInt(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY));
- }
-
- Option<HoodieConsistentHashingMetadata> metadataOption =
loadMetadata(hoodieTable, partition);
- ValidationUtils.checkState(metadataOption.isPresent(), "Failed to load
metadata for partition: " + partition);
- HoodieConsistentHashingMetadata meta = metadataOption.get();
- ValidationUtils.checkState(meta.getSeqNo() == seqNo,
- "Non serialized update to hashing metadata, old seq: " +
meta.getSeqNo() + ", new seq: " + seqNo);
-
- // Get new metadata and save
- meta.setChildrenNodes(childNodes);
- List<ConsistentHashingNode> newNodes = (new
ConsistentBucketIdentifier(meta)).getNodes().stream()
- .map(n -> new ConsistentHashingNode(n.getValue(),
n.getFileIdPrefix(), ConsistentHashingNode.NodeTag.NORMAL))
- .collect(Collectors.toList());
- HoodieConsistentHashingMetadata newMeta = new
HoodieConsistentHashingMetadata(meta.getVersion(), meta.getPartitionPath(),
- instantTime, meta.getNumBuckets(), seqNo + 1, newNodes);
- // Overwrite to tolerate re-run of clustering operation
- saveMetadata(hoodieTable, newMeta, true);
- });
-
- return writeStatuses;
- }
-
- /**
- * Do nothing.
- * A failed write may create a hashing metadata for a partition. In this
case, we still do nothing when rolling back
- * the failed write. Because the hashing metadata created by a writer must
have 00000000000000 timestamp and can be viewed
- * as the initialization of a partition rather than as a part of the failed
write.
- */
- @Override
- public boolean rollbackCommit(String instantTime) {
- return true;
- }
-
- @Override
- protected BucketIndexLocationMapper getLocationMapper(HoodieTable table,
List<String> partitionPath) {
- return new ConsistentBucketIndexLocationMapper(table, partitionPath);
- }
-
- /**
- * Load hashing metadata of the given partition, if it is not existed,
create a new one (also persist it into storage)
+ * Loads hashing metadata of the given partition, if it does not exist,
creates a new one (also persist it into storage).
+ *
+ * <p>NOTE: When creating a new hashing metadata, the content will always be
the same for the same partition.
+ * It means when multiple writer are trying to initialize metadata for the
same partition,
+ * no lock or synchronization is necessary as they are creating the file
with the same content.
+ *
+ * @param table Hoodie table
+ * @param partition Table partition
+ * @param numBuckets Default bucket number
*
- * @param table hoodie table
- * @param partition table partition
* @return Consistent hashing metadata
*/
- public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable
table, String partition) {
+ public static HoodieConsistentHashingMetadata
loadOrCreateMetadata(HoodieTable table, String partition, int numBuckets) {
Option<HoodieConsistentHashingMetadata> metadataOption =
loadMetadata(table, partition);
if (metadataOption.isPresent()) {
return metadataOption.get();
}
+ LOG.info("Failed to load metadata, try to create one. Partition: {}.",
partition);
+
// There is no metadata, so try to create a new one and save it.
HoodieConsistentHashingMetadata metadata = new
HoodieConsistentHashingMetadata(partition, numBuckets);
if (saveMetadata(table, metadata, false)) {
@@ -187,15 +93,16 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
}
/**
- * Load hashing metadata of the given partition, if it is not existed,
return null
+ * Loads hashing metadata of the given partition, if it does not exist,
returns empty.
*
- * @param table hoodie table
- * @param partition table partition
- * @return Consistent hashing metadata or null if it does not exist
+ * @param table Hoodie table
+ * @param partition Table partition
+ * @return Consistent hashing metadata or empty if it does not exist
*/
public static Option<HoodieConsistentHashingMetadata>
loadMetadata(HoodieTable table, String partition) {
- Path metadataPath =
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(),
partition);
- Path partitionPath =
FSUtils.getPartitionPath(table.getMetaClient().getBasePathV2(), partition);
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ Path metadataPath =
FSUtils.getPartitionPath(metaClient.getHashingMetadataPath(), partition);
+ Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePathV2(),
partition);
try {
Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> {
String filename = fileStatus.getPath().getName();
@@ -205,14 +112,14 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
String filename = fileStatus.getPath().getName();
return filename.contains(HASHING_METADATA_FILE_SUFFIX);
};
- final FileStatus[] metaFiles =
table.getMetaClient().getFs().listStatus(metadataPath);
+ final FileStatus[] metaFiles =
metaClient.getFs().listStatus(metadataPath);
final TreeSet<String> commitMetaTss =
Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
.map(commitFile ->
HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
.sorted()
.collect(Collectors.toCollection(TreeSet::new));
final FileStatus[] hashingMetaFiles =
Arrays.stream(metaFiles).filter(hashingMetadataFilePredicate)
.sorted(Comparator.comparing(f -> f.getPath().getName()))
- .toArray(FileStatus[]::new);
+ .toArray(FileStatus[]::new);
// max committed metadata file
final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null :
commitMetaTss.last();
// max updated metadata file
@@ -223,10 +130,10 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
}
// if max updated metadata file and committed metadata file are same
then return
if (maxCommitMetaFileTs != null && maxMetadataFile != null
- &&
maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName())))
{
+ &&
maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName())))
{
return loadMetadataFromGivenFile(table, maxMetadataFile);
}
- HoodieTimeline completedCommits =
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+ HoodieTimeline completedCommits =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
// fix the in-consistency between un-committed and committed hashing
metadata files.
List<FileStatus> fixed = new ArrayList<>();
@@ -262,14 +169,14 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
}
/**
- * Save metadata into storage
+ * Saves the metadata into storage
*
- * @param table hoodie table
- * @param metadata hashing metadata to be saved
- * @param overwrite whether to overwrite existing metadata
+ * @param table Hoodie table
+ * @param metadata Hashing metadata to be saved
+ * @param overwrite Whether to overwrite existing metadata
* @return true if the metadata is saved successfully
*/
- private static boolean saveMetadata(HoodieTable table,
HoodieConsistentHashingMetadata metadata, boolean overwrite) {
+ public static boolean saveMetadata(HoodieTable table,
HoodieConsistentHashingMetadata metadata, boolean overwrite) {
HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
Path dir =
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(),
metadata.getPartitionPath());
Path fullPath = new Path(dir, metadata.getFilename());
@@ -284,44 +191,12 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
return false;
}
- public class ConsistentBucketIndexLocationMapper implements
BucketIndexLocationMapper {
-
- /**
- * Mapping from partitionPath -> bucket identifier
- */
- private final Map<String, ConsistentBucketIdentifier>
partitionToIdentifier;
-
- public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String>
partitions) {
- // TODO maybe parallel
- partitionToIdentifier = partitions.stream().collect(Collectors.toMap(p
-> p, p -> {
- HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table,
p);
- return new ConsistentBucketIdentifier(metadata);
- }));
- }
-
- @Override
- public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
- String partitionPath = key.getPartitionPath();
- ConsistentHashingNode node =
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
- if (!StringUtils.isNullOrEmpty(node.getFileIdPrefix())) {
- /**
- * Dynamic Bucket Index doesn't need the instant time of the latest
file group.
- * We add suffix 0 here to the file uuid, following the naming
convention, i.e., fileId = [uuid]_[numWrites]
- */
- return Option.of(new HoodieRecordLocation(null,
FSUtils.createNewFileId(node.getFileIdPrefix(), 0)));
- }
-
- LOG.error("Consistent hashing node has no file group, partition: " +
partitionPath + ", meta: "
- +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
- throw new HoodieIndexException("Failed to getBucket as hashing node has
no file group");
- }
- }
-
/***
* Creates commit marker corresponding to hashing metadata file after post
commit clustering operation.
- * @param table hoodie table
- * @param fileStatus file for which commit marker should be created
- * @param partitionPath partition path the file belongs to
+ *
+ * @param table Hoodie table
+ * @param fileStatus File for which commit marker should be created
+ * @param partitionPath Partition path the file belongs to
* @throws IOException
*/
private static void createCommitMarker(HoodieTable table, Path fileStatus,
Path partitionPath) throws IOException {
@@ -335,8 +210,9 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
/***
* Loads consistent hashing metadata of table from the given meta file
- * @param table hoodie table
- * @param metaFile hashing metadata file
+ *
+ * @param table Hoodie table
+ * @param metaFile Hashing metadata file
* @return HoodieConsistentHashingMetadata object
*/
private static Option<HoodieConsistentHashingMetadata>
loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
@@ -356,15 +232,18 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
/***
* COMMIT MARKER RECOVERY JOB.
- * If particular hashing metadta file doesn't have commit marker then there
could be a case where clustering is done but post commit marker
+ *
+ * <p>If particular hashing metadata file doesn't have commit marker then
there could be a case where clustering is done but post commit marker
* creation operation failed. In this case this method will check file group
id from consistent hashing metadata against storage base file group ids.
* if one of the file group matches then we can conclude that this is the
latest metadata file.
- * Note : we will end up calling this method if there is no marker file and
no replace commit on active timeline, if replace commit is not present on
- * active timeline that means old file group id's before clustering
operation got cleaned and only new file group id's of current clustering
operation
+ *
+ * <p>Note : we will end up calling this method if there is no marker file
and no replace commit on active timeline, if replace commit is not present on
+ * active timeline that means old file group id's before clustering
operation got cleaned and only new file group id's of current clustering
operation
* are present on the disk.
- * @param table hoodie table
- * @param metaFile metadata file on which sync check needs to be performed
- * @param partition partition metadata file belongs to
+ *
+ * @param table Hoodie table
+ * @param metaFile Metadata file on which sync check needs to be performed
+ * @param partition Partition metadata file belongs to
* @return true if hashing metadata file is latest else false
*/
private static boolean recommitMetadataFile(HoodieTable table, FileStatus
metaFile, String partition) {
@@ -381,7 +260,7 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile ->
hoodieConsistentHashingMetadata.getNodes().stream().anyMatch(node ->
node.getFileIdPrefix().equals(hoodieBaseFile));
if (table.getBaseFileOnlyView().getLatestBaseFiles(partition)
- .map(fileIdPrefix ->
FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate))
{
+ .map(fileIdPrefix ->
FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate))
{
try {
createCommitMarker(table, metaFile.getPath(), partitionPath);
return true;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
new file mode 100644
index 00000000000..156d14b7cf5
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/HoodieConsistentBucketIndex.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bucket;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Consistent hashing bucket index implementation, with auto-adjust bucket
number.
+ * NOTE: bucket resizing is triggered by clustering.
+ */
+public class HoodieConsistentBucketIndex extends HoodieBucketIndex {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieConsistentBucketIndex.class);
+
+ public HoodieConsistentBucketIndex(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses,
+ HoodieEngineContext context,
+ HoodieTable hoodieTable)
+ throws HoodieIndexException {
+ throw new HoodieIndexException("Consistent hashing index does not support
update location without the instant parameter");
+ }
+
+ /**
+ * Do nothing.
+ * A failed write may create a hashing metadata for a partition. In this
case, we still do nothing when rolling back
+ * the failed write. Because the hashing metadata created by a writer must
have 00000000000000 timestamp and can be viewed
+ * as the initialization of a partition rather than as a part of the failed
write.
+ */
+ @Override
+ public boolean rollbackCommit(String instantTime) {
+ return true;
+ }
+
+ @Override
+ protected BucketIndexLocationMapper getLocationMapper(HoodieTable table,
List<String> partitionPath) {
+ return new ConsistentBucketIndexLocationMapper(table, partitionPath);
+ }
+
+ public class ConsistentBucketIndexLocationMapper implements
BucketIndexLocationMapper {
+
+ /**
+ * Mapping from partitionPath -> bucket identifier
+ */
+ private final Map<String, ConsistentBucketIdentifier>
partitionToIdentifier;
+
+ public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String>
partitions) {
+ // TODO maybe parallel
+ partitionToIdentifier = partitions.stream().collect(Collectors.toMap(p
-> p, p -> {
+ HoodieConsistentHashingMetadata metadata =
ConsistentBucketIndexUtils.loadOrCreateMetadata(table, p, getNumBuckets());
+ return new ConsistentBucketIdentifier(metadata);
+ }));
+ }
+
+ @Override
+ public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
+ String partitionPath = key.getPartitionPath();
+ ConsistentHashingNode node =
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
+ if (!StringUtils.isNullOrEmpty(node.getFileIdPrefix())) {
+ // Dynamic Bucket Index doesn't need the instant time of the latest
file group.
+ // We add suffix 0 here to the file uuid, following the naming
convention, i.e., fileId = [uuid]_[numWrites]
+ return Option.of(new HoodieRecordLocation(null,
FSUtils.createNewFileId(node.getFileIdPrefix(), 0)));
+ }
+
+ LOG.error("Consistent hashing node has no file group, partition: {},
meta: {}, record_key: {}",
+ partitionPath,
partitionToIdentifier.get(partitionPath).getMetadata().getFilename(),
key.toString());
+ throw new HoodieIndexException("Failed to getBucket as hashing node has
no file group");
+ }
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
index c764204565e..2024c601675 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndexFactory.java
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.bloom.HoodieGlobalBloomIndex;
import org.apache.hudi.index.bloom.ListBasedHoodieBloomIndexHelper;
+import org.apache.hudi.index.bucket.HoodieConsistentBucketIndex;
import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
import org.apache.hudi.index.simple.HoodieSimpleIndex;
@@ -56,7 +57,14 @@ public final class FlinkHoodieIndexFactory {
case GLOBAL_SIMPLE:
return new HoodieGlobalSimpleIndex(config, Option.empty());
case BUCKET:
- return new HoodieSimpleBucketIndex(config);
+ switch (config.getBucketIndexEngineType()) {
+ case SIMPLE:
+ return new HoodieSimpleBucketIndex(config);
+ case CONSISTENT_HASHING:
+ return new HoodieConsistentBucketIndex(config);
+ default:
+ throw new HoodieIndexException("Unknown bucket index engine type:
" + config.getBucketIndexEngineType());
+ }
default:
throw new HoodieIndexException("Unsupported index type " +
config.getIndexType());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
index c5ab254ac46..a49ab1ddf4b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/SparkConsistentBucketClusteringPlanStrategy.java
@@ -39,6 +39,7 @@ import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.table.HoodieTable;
import
org.apache.hudi.table.action.cluster.strategy.PartitionAwareClusteringPlanStrategy;
@@ -125,9 +126,7 @@ public class SparkConsistentBucketClusteringPlanStrategy<T
extends HoodieRecordP
*/
@Override
protected Stream<HoodieClusteringGroup>
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice>
fileSlices) {
- ValidationUtils.checkArgument(getHoodieTable().getIndex() instanceof
HoodieSparkConsistentBucketIndex,
- "Mismatch of index type and the clustering strategy, index: " +
getHoodieTable().getIndex().getClass().getSimpleName());
- Option<HoodieConsistentHashingMetadata> metadata =
HoodieSparkConsistentBucketIndex.loadMetadata(getHoodieTable(), partitionPath);
+ Option<HoodieConsistentHashingMetadata> metadata =
ConsistentBucketIndexUtils.loadMetadata(getHoodieTable(), partitionPath);
ValidationUtils.checkArgument(metadata.isPresent(), "Metadata is empty for
partition: " + partitionPath);
ConsistentBucketIdentifier identifier = new
ConsistentBucketIdentifier(metadata.get());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
index 00cd8961143..4ddacc7f3cb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkConsistentBucketDuplicateUpdateStrategy.java
@@ -38,7 +38,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
-import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
@@ -131,7 +131,7 @@ public class SparkConsistentBucketDuplicateUpdateStrategy<T
extends HoodieRecord
String preInstant = partitionToInstant.putIfAbsent(p, instant);
checkState(preInstant == null || preInstant.equals(instant), "Find a
partition: " + p + " with two clustering instants");
if (!partitionToHashingMeta.containsKey(p)) {
- Option<HoodieConsistentHashingMetadata> metadataOption =
HoodieSparkConsistentBucketIndex.loadMetadata(table, p);
+ Option<HoodieConsistentHashingMetadata> metadataOption =
ConsistentBucketIndexUtils.loadMetadata(table, p);
checkState(metadataOption.isPresent(), "Failed to load consistent
hashing metadata for partition: " + p);
partitionToHashingMeta.put(p, metadataOption.get());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
index 87d6e19ce9f..ae6640cbaac 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDConsistentBucketBulkInsertPartitioner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.Partitioner;
@@ -101,7 +102,7 @@ public class RDDConsistentBucketBulkInsertPartitioner<T>
extends RDDBucketIndexP
*/
private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
HoodieSparkConsistentBucketIndex index =
(HoodieSparkConsistentBucketIndex) table.getIndex();
- HoodieConsistentHashingMetadata metadata =
index.loadOrCreateMetadata(this.table, partition);
+ HoodieConsistentHashingMetadata metadata =
ConsistentBucketIndexUtils.loadOrCreateMetadata(this.table, partition,
index.getNumBuckets());
if (hashingChildrenNodes.containsKey(partition)) {
metadata.setChildrenNodes(hashingChildrenNodes.get(partition));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
index ebbecbda241..b9a77e56f67 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/HoodieSparkConsistentBucketIndex.java
@@ -24,69 +24,36 @@ import org.apache.hudi.client.WriteStatus;
import
org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
-import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
-import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.TreeSet;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
import scala.Tuple2;
-import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX;
-import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
-import static
org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
-
/**
* Consistent hashing bucket index implementation, with auto-adjust bucket
number.
* NOTE: bucket resizing is triggered by clustering.
*/
-public class HoodieSparkConsistentBucketIndex extends HoodieBucketIndex {
-
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieSparkConsistentBucketIndex.class);
+public class HoodieSparkConsistentBucketIndex extends
HoodieConsistentBucketIndex {
public HoodieSparkConsistentBucketIndex(HoodieWriteConfig config) {
super(config);
}
- @Override
- public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus>
writeStatuses,
- HoodieEngineContext context,
- HoodieTable hoodieTable)
- throws HoodieIndexException {
- throw new HoodieIndexException("Consistent hashing index does not support
update location without the instant parameter");
- }
-
/**
* Persist hashing metadata to storage. Only clustering operations will
modify the metadata.
* For example, splitting & merging buckets, or just sorting and producing a
new bucket.
@@ -124,7 +91,7 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
seqNo =
Integer.parseInt(m.get(SparkConsistentBucketClusteringPlanStrategy.METADATA_SEQUENCE_NUMBER_KEY));
}
- Option<HoodieConsistentHashingMetadata> metadataOption =
loadMetadata(hoodieTable, partition);
+ Option<HoodieConsistentHashingMetadata> metadataOption =
ConsistentBucketIndexUtils.loadMetadata(hoodieTable, partition);
ValidationUtils.checkState(metadataOption.isPresent(), "Failed to load
metadata for partition: " + partition);
HoodieConsistentHashingMetadata meta = metadataOption.get();
ValidationUtils.checkState(meta.getSeqNo() == seqNo,
@@ -137,258 +104,9 @@ public class HoodieSparkConsistentBucketIndex extends
HoodieBucketIndex {
.collect(Collectors.toList());
HoodieConsistentHashingMetadata newMeta = new
HoodieConsistentHashingMetadata(meta.getVersion(), meta.getPartitionPath(),
instantTime, meta.getNumBuckets(), seqNo + 1, newNodes);
- // Overwrite to tolerate re-run of clustering operation
- saveMetadata(hoodieTable, newMeta, true);
+ ConsistentBucketIndexUtils.saveMetadata(hoodieTable, newMeta, true);
});
return writeStatuses;
}
-
- /**
- * Do nothing.
- * A failed write may create a hashing metadata for a partition. In this
case, we still do nothing when rolling back
- * the failed write. Because the hashing metadata created by a writer must
have 00000000000000 timestamp and can be viewed
- * as the initialization of a partition rather than as a part of the failed
write.
- */
- @Override
- public boolean rollbackCommit(String instantTime) {
- return true;
- }
-
- @Override
- protected BucketIndexLocationMapper getLocationMapper(HoodieTable table,
List<String> partitionPath) {
- return new ConsistentBucketIndexLocationMapper(table, partitionPath);
- }
-
- /**
- * Load hashing metadata of the given partition, if it is not existed,
create a new one (also persist it into storage)
- *
- * @param table hoodie table
- * @param partition table partition
- * @return Consistent hashing metadata
- */
- public HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable
table, String partition) {
- Option<HoodieConsistentHashingMetadata> metadataOption =
loadMetadata(table, partition);
- if (metadataOption.isPresent()) {
- return metadataOption.get();
- }
-
- // There is no metadata, so try to create a new one and save it.
- HoodieConsistentHashingMetadata metadata = new
HoodieConsistentHashingMetadata(partition, numBuckets);
- if (saveMetadata(table, metadata, false)) {
- return metadata;
- }
-
- // The creation failed, so try load metadata again. Concurrent creation of
metadata should have succeeded.
- // Note: the consistent problem of cloud storage is handled internal in
the HoodieWrapperFileSystem, i.e., ConsistentGuard
- metadataOption = loadMetadata(table, partition);
- ValidationUtils.checkState(metadataOption.isPresent(), "Failed to load or
create metadata, partition: " + partition);
- return metadataOption.get();
- }
-
- /**
- * Load hashing metadata of the given partition, if it is not existed,
return null
- *
- * @param table hoodie table
- * @param partition table partition
- * @return Consistent hashing metadata or null if it does not exist
- */
- public static Option<HoodieConsistentHashingMetadata>
loadMetadata(HoodieTable table, String partition) {
- Path metadataPath =
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(),
partition);
- Path partitionPath =
FSUtils.getPartitionPath(table.getMetaClient().getBasePathV2(), partition);
- try {
- Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> {
- String filename = fileStatus.getPath().getName();
- return
filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX);
- };
- Predicate<FileStatus> hashingMetadataFilePredicate = fileStatus -> {
- String filename = fileStatus.getPath().getName();
- return filename.contains(HASHING_METADATA_FILE_SUFFIX);
- };
- final FileStatus[] metaFiles =
table.getMetaClient().getFs().listStatus(metadataPath);
- final TreeSet<String> commitMetaTss =
Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
- .map(commitFile ->
HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
- .sorted()
- .collect(Collectors.toCollection(TreeSet::new));
- final FileStatus[] hashingMetaFiles =
Arrays.stream(metaFiles).filter(hashingMetadataFilePredicate)
- .sorted(Comparator.comparing(f -> f.getPath().getName()))
- .toArray(FileStatus[]::new);
- // max committed metadata file
- final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null :
commitMetaTss.last();
- // max updated metadata file
- FileStatus maxMetadataFile = hashingMetaFiles.length > 0 ?
hashingMetaFiles[hashingMetaFiles.length - 1] : null;
- // If single file present in metadata and if its default file return it
- if (maxMetadataFile != null &&
HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS))
{
- return loadMetadataFromGivenFile(table, maxMetadataFile);
- }
- // if max updated metadata file and committed metadata file are same
then return
- if (maxCommitMetaFileTs != null && maxMetadataFile != null
- &&
maxCommitMetaFileTs.equals(HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName())))
{
- return loadMetadataFromGivenFile(table, maxMetadataFile);
- }
- HoodieTimeline completedCommits =
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants();
-
- // fix the in-consistency between un-committed and committed hashing
metadata files.
- List<FileStatus> fixed = new ArrayList<>();
- Arrays.stream(hashingMetaFiles).forEach(hashingMetaFile -> {
- Path path = hashingMetaFile.getPath();
- String timestamp =
HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName());
- if (maxCommitMetaFileTs != null &&
timestamp.compareTo(maxCommitMetaFileTs) <= 0) {
- // only fix the metadata with greater timestamp than max committed
timestamp
- return;
- }
- boolean isRehashingCommitted =
completedCommits.containsInstant(timestamp) ||
timestamp.equals(HoodieTimeline.INIT_INSTANT_TS);
- if (isRehashingCommitted) {
- if (!commitMetaTss.contains(timestamp)) {
- try {
- createCommitMarker(table, path, partitionPath);
- } catch (IOException e) {
- throw new HoodieIOException("Exception while creating marker
file " + path.getName() + " for partition " + partition, e);
- }
- }
- fixed.add(hashingMetaFile);
- } else if (recommitMetadataFile(table, hashingMetaFile, partition)) {
- fixed.add(hashingMetaFile);
- }
- });
-
- return fixed.isEmpty() ? Option.empty() :
loadMetadataFromGivenFile(table, fixed.get(fixed.size() - 1));
- } catch (FileNotFoundException e) {
- return Option.empty();
- } catch (IOException e) {
- LOG.error("Error when loading hashing metadata, partition: " +
partition, e);
- throw new HoodieIndexException("Error while loading hashing metadata",
e);
- }
- }
-
- /**
- * Save metadata into storage
- *
- * @param table hoodie table
- * @param metadata hashing metadata to be saved
- * @param overwrite whether to overwrite existing metadata
- * @return true if the metadata is saved successfully
- */
- private static boolean saveMetadata(HoodieTable table,
HoodieConsistentHashingMetadata metadata, boolean overwrite) {
- HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
- Path dir =
FSUtils.getPartitionPath(table.getMetaClient().getHashingMetadataPath(),
metadata.getPartitionPath());
- Path fullPath = new Path(dir, metadata.getFilename());
- try (FSDataOutputStream fsOut = fs.create(fullPath, overwrite)) {
- byte[] bytes = metadata.toBytes();
- fsOut.write(bytes);
- fsOut.close();
- return true;
- } catch (IOException e) {
- LOG.warn("Failed to update bucket metadata: " + metadata, e);
- }
- return false;
- }
-
- public class ConsistentBucketIndexLocationMapper implements
BucketIndexLocationMapper {
-
- /**
- * Mapping from partitionPath -> bucket identifier
- */
- private final Map<String, ConsistentBucketIdentifier>
partitionToIdentifier;
-
- public ConsistentBucketIndexLocationMapper(HoodieTable table, List<String>
partitions) {
- // TODO maybe parallel
- partitionToIdentifier = partitions.stream().collect(Collectors.toMap(p
-> p, p -> {
- HoodieConsistentHashingMetadata metadata = loadOrCreateMetadata(table,
p);
- return new ConsistentBucketIdentifier(metadata);
- }));
- }
-
- @Override
- public Option<HoodieRecordLocation> getRecordLocation(HoodieKey key) {
- String partitionPath = key.getPartitionPath();
- ConsistentHashingNode node =
partitionToIdentifier.get(partitionPath).getBucket(key, indexKeyFields);
- if (!StringUtils.isNullOrEmpty(node.getFileIdPrefix())) {
- /**
- * Dynamic Bucket Index doesn't need the instant time of the latest
file group.
- * We add suffix 0 here to the file uuid, following the naming
convention, i.e., fileId = [uuid]_[numWrites]
- */
- return Option.of(new HoodieRecordLocation(null,
FSUtils.createNewFileId(node.getFileIdPrefix(), 0)));
- }
-
- LOG.error("Consistent hashing node has no file group, partition: " +
partitionPath + ", meta: "
- +
partitionToIdentifier.get(partitionPath).getMetadata().getFilename() + ",
record_key: " + key.toString());
- throw new HoodieIndexException("Failed to getBucket as hashing node has
no file group");
- }
- }
-
- /***
- * Creates commit marker corresponding to hashing metadata file after post
commit clustering operation.
- * @param table hoodie table
- * @param fileStatus file for which commit marker should be created
- * @param partitionPath partition path the file belongs to
- * @throws IOException
- */
- private static void createCommitMarker(HoodieTable table, Path fileStatus,
Path partitionPath) throws IOException {
- HoodieWrapperFileSystem fs = table.getMetaClient().getFs();
- Path fullPath = new Path(partitionPath,
getTimestampFromFile(fileStatus.getName()) +
HASHING_METADATA_COMMIT_FILE_SUFFIX);
- if (fs.exists(fullPath)) {
- return;
- }
- FileIOUtils.createFileInPath(fs, fullPath,
Option.of(StringUtils.EMPTY_STRING.getBytes()));
- }
-
- /***
- * Loads consistent hashing metadata of table from the given meta file
- * @param table hoodie table
- * @param metaFile hashing metadata file
- * @return HoodieConsistentHashingMetadata object
- */
- private static Option<HoodieConsistentHashingMetadata>
loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
- try {
- if (metaFile == null) {
- return Option.empty();
- }
- byte[] content =
FileIOUtils.readAsByteArray(table.getMetaClient().getFs().open(metaFile.getPath()));
- return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
- } catch (FileNotFoundException e) {
- return Option.empty();
- } catch (IOException e) {
- LOG.error("Error when loading hashing metadata, for path: " +
metaFile.getPath().getName(), e);
- throw new HoodieIndexException("Error while loading hashing metadata",
e);
- }
- }
-
- /***
- * COMMIT MARKER RECOVERY JOB.
- * If particular hashing metadta file doesn't have commit marker then there
could be a case where clustering is done but post commit marker
- * creation operation failed. In this case this method will check file group
id from consistent hashing metadata against storage base file group ids.
- * if one of the file group matches then we can conclude that this is the
latest metadata file.
- * Note : we will end up calling this method if there is no marker file and
no replace commit on active timeline, if replace commit is not present on
- * active timeline that means old file group id's before clustering
operation got cleaned and only new file group id's of current clustering
operation
- * are present on the disk.
- * @param table hoodie table
- * @param metaFile metadata file on which sync check needs to be performed
- * @param partition partition metadata file belongs to
- * @return true if hashing metadata file is latest else false
- */
- private static boolean recommitMetadataFile(HoodieTable table, FileStatus
metaFile, String partition) {
- Path partitionPath =
FSUtils.getPartitionPath(table.getMetaClient().getBasePathV2(), partition);
- String timestamp = getTimestampFromFile(metaFile.getPath().getName());
- if (table.getPendingCommitTimeline().containsInstant(timestamp)) {
- return false;
- }
- Option<HoodieConsistentHashingMetadata>
hoodieConsistentHashingMetadataOption = loadMetadataFromGivenFile(table,
metaFile);
- if (!hoodieConsistentHashingMetadataOption.isPresent()) {
- return false;
- }
- HoodieConsistentHashingMetadata hoodieConsistentHashingMetadata =
hoodieConsistentHashingMetadataOption.get();
-
- Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile ->
hoodieConsistentHashingMetadata.getNodes().stream().anyMatch(node ->
node.getFileIdPrefix().equals(hoodieBaseFile));
- if (table.getBaseFileOnlyView().getLatestBaseFiles(partition)
- .map(fileIdPrefix ->
FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate))
{
- try {
- createCommitMarker(table, metaFile.getPath(), partitionPath);
- return true;
- } catch (IOException e) {
- throw new HoodieIOException("Exception while creating marker file " +
metaFile.getPath().getName() + " for partition " + partition, e);
- }
- }
- return false;
- }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java
index ad541fdef57..10cc247e5ec 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSparkConsistentBucketClustering.java
@@ -46,7 +46,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
@@ -145,7 +145,7 @@ public class TestSparkConsistentBucketClustering extends
HoodieClientTestHarness
Assertions.assertEquals(2000,
readRecords(dataGen.getPartitionPaths()).size());
Arrays.stream(dataGen.getPartitionPaths()).forEach(p -> {
- HoodieConsistentHashingMetadata metadata =
HoodieSparkConsistentBucketIndex.loadMetadata(table, p).get();
+ HoodieConsistentHashingMetadata metadata =
ConsistentBucketIndexUtils.loadMetadata(table, p).get();
Assertions.assertEquals(targetBucketNum, metadata.getNodes().size());
// The file slice has no log files
@@ -201,7 +201,7 @@ public class TestSparkConsistentBucketClustering extends
HoodieClientTestHarness
throw new RuntimeException(e);
}
}
- HoodieConsistentHashingMetadata metadata =
HoodieSparkConsistentBucketIndex.loadMetadata(table, p).get();
+ HoodieConsistentHashingMetadata metadata =
ConsistentBucketIndexUtils.loadMetadata(table, p).get();
Assertions.assertEquals(targetBucketNum, metadata.getNodes().size());
});
writeData(HoodieActiveTimeline.createNewInstantTime(), 10, true);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java
index 120ba487fcf..4535983389d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieConsistentHashingMetadata.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.model;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.JsonUtils;
@@ -31,11 +30,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import java.util.UUID;
/**
* All the metadata that is used for consistent hashing bucket index
@@ -83,14 +82,33 @@ public class HoodieConsistentHashingMetadata implements
Serializable {
* Construct default metadata with all bucket's file group uuid initialized
*/
public HoodieConsistentHashingMetadata(String partitionPath, int numBuckets)
{
- this((short) 0, partitionPath, HoodieTimeline.INIT_INSTANT_TS, numBuckets,
0, constructDefaultHashingNodes(numBuckets));
+ this((short) 0, partitionPath, HoodieTimeline.INIT_INSTANT_TS, numBuckets,
0, constructDefaultHashingNodes(partitionPath, numBuckets));
this.firstCreated = true;
}
- private static List<ConsistentHashingNode> constructDefaultHashingNodes(int
numBuckets) {
+ private static List<ConsistentHashingNode>
constructDefaultHashingNodes(String partitionPath, int numBuckets) {
long step = ((long) HASH_VALUE_MASK + numBuckets - 1) / numBuckets;
- return IntStream.range(1, numBuckets + 1)
- .mapToObj(i -> new ConsistentHashingNode((int) Math.min(step * i,
HASH_VALUE_MASK), FSUtils.createNewFileIdPfx())).collect(Collectors.toList());
+ long bucketStart = 0;
+ List<ConsistentHashingNode> nodes = new ArrayList<>(numBuckets);
+ for (int idx = 1; idx < numBuckets + 1; idx++) {
+ long bucketEnd = Math.min(step * idx, HASH_VALUE_MASK);
+ String fileId = generateUUID(partitionPath, bucketStart, bucketEnd);
+ nodes.add(new ConsistentHashingNode((int) bucketEnd, fileId));
+ bucketStart = bucketEnd;
+ }
+ return nodes;
+ }
+
+ private static String generateUUID(String partitionPath, long bucketStart,
long bucketEnd) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(16);
+ byteBuffer.putLong(bucketStart);
+ byteBuffer.putLong(bucketEnd);
+ byte[] longBytes = byteBuffer.array();
+ byte[] partitionPathBytes = partitionPath.getBytes(StandardCharsets.UTF_8);
+ byte[] combinedBytes = new byte[longBytes.length +
partitionPathBytes.length];
+ System.arraycopy(longBytes, 0, combinedBytes, 0, longBytes.length);
+ System.arraycopy(partitionPathBytes, 0, combinedBytes, longBytes.length,
partitionPathBytes.length);
+ return UUID.nameUUIDFromBytes(combinedBytes).toString();
}
public short getVersion() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3c3c14f8c5c..c140d40af88 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -425,6 +425,13 @@ public class FlinkOptions extends HoodieConfig {
+ "Actual value will be obtained by invoking .toString() on the
field value. Nested fields can be specified using "
+ "the dot notation eg: `a.b.c`");
+ @AdvancedConfig
+ public static final ConfigOption<String> BUCKET_INDEX_ENGINE_TYPE =
ConfigOptions
+ .key(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE.key())
+ .stringType()
+ .defaultValue("SIMPLE")
+ .withDescription("Type of bucket index engine. Available options:
[SIMPLE | CONSISTENT_HASHING]");
+
@AdvancedConfig
public static final ConfigOption<Integer> BUCKET_INDEX_NUM_BUCKETS =
ConfigOptions
.key(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 02b9e8d6356..4b90ab4c73a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -135,6 +135,11 @@ public class OptionsResolver {
return
conf.getString(FlinkOptions.INDEX_TYPE).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.name());
}
+ public static HoodieIndex.BucketIndexEngineType
getBucketEngineType(Configuration conf) {
+ String bucketEngineType = conf.get(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE);
+ return HoodieIndex.BucketIndexEngineType.valueOf(bucketEngineType);
+ }
+
/**
* Returns whether the source should emit changelog.
*
@@ -195,8 +200,8 @@ public class OptionsResolver {
* Returns whether the operation is INSERT OVERWRITE (table or partition).
*/
public static boolean isInsertOverwrite(Configuration conf) {
- return
conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
- ||
conf.getString(FlinkOptions.OPERATION).equals(WriteOperationType.INSERT_OVERWRITE.value());
+ return
conf.getString(FlinkOptions.OPERATION).equalsIgnoreCase(WriteOperationType.INSERT_OVERWRITE_TABLE.value())
+ ||
conf.getString(FlinkOptions.OPERATION).equalsIgnoreCase(WriteOperationType.INSERT_OVERWRITE.value());
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
new file mode 100644
index 00000000000..5fa4c48ae19
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/ConsistentBucketAssignFunction.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bucket;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.ConsistentHashingNode;
+import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieLockException;
+import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
+import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
+import org.apache.hudi.util.FlinkWriteClients;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The function to tag each incoming record with a location of a file based on
consistent bucket index.
+ */
+public class ConsistentBucketAssignFunction extends
ProcessFunction<HoodieRecord, HoodieRecord> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ConsistentBucketAssignFunction.class);
+
+ private final Configuration config;
+ private final List<String> indexKeyFields;
+ private final int bucketNum;
+ private transient HoodieFlinkWriteClient writeClient;
+ private transient Map<String, ConsistentBucketIdentifier>
partitionToIdentifier;
+ private final int maxRetries = 10;
+ private final long maxWaitTimeInMs = 1000;
+
+ public ConsistentBucketAssignFunction(Configuration conf) {
+ this.config = conf;
+ this.indexKeyFields =
Arrays.asList(OptionsResolver.getIndexKeyField(conf).split(","));
+ this.bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ try {
+ this.writeClient = FlinkWriteClients.createWriteClient(this.config,
getRuntimeContext());
+ this.partitionToIdentifier = new HashMap<>();
+ } catch (Throwable e) {
+ LOG.error("Fail to initialize consistent bucket assigner", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void processElement(HoodieRecord record, Context context,
Collector<HoodieRecord> collector) throws Exception {
+ final HoodieKey hoodieKey = record.getKey();
+ final String partition = hoodieKey.getPartitionPath();
+
+ final ConsistentHashingNode node =
getBucketIdentifier(partition).getBucket(hoodieKey, indexKeyFields);
+ Preconditions.checkArgument(
+ StringUtils.nonEmpty(node.getFileIdPrefix()),
+ "Consistent hashing node has no file group, partition: " + partition +
", meta: "
+ + partitionToIdentifier.get(partition).getMetadata().getFilename()
+ ", record_key: " + hoodieKey);
+
+ record.unseal();
+ record.setCurrentLocation(new HoodieRecordLocation("U",
FSUtils.createNewFileId(node.getFileIdPrefix(), 0)));
+ record.seal();
+ collector.collect(record);
+ }
+
+ private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
+ return partitionToIdentifier.computeIfAbsent(partition, p -> {
+ // NOTE: If the metadata does not exist, there maybe concurrent creation
of the metadata. And we allow multiple subtask
+ // trying to create the same metadata as the initial metadata always has
the same content for the same partition.
+ int retryCount = 0;
+ HoodieConsistentHashingMetadata metadata = null;
+ while (retryCount <= maxRetries) {
+ try {
+ metadata =
ConsistentBucketIndexUtils.loadOrCreateMetadata(this.writeClient.getHoodieTable(),
p, bucketNum);
+ break;
+ } catch (Exception e) {
+ if (retryCount >= maxRetries) {
+ throw new HoodieLockException("Fail to load or create metadata for
partition " + partition, e);
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(maxWaitTimeInMs);
+ } catch (InterruptedException ex) {
+ // ignore InterruptedException here
+ }
+ LOG.info("Retrying to load or create metadata for partition {} for
{} times", partition, retryCount + 1);
+ } finally {
+ retryCount++;
+ }
+ }
+ ValidationUtils.checkState(metadata != null);
+ return new ConsistentBucketIdentifier(metadata);
+ });
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 4cedb45b82c..474e2b18e60 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -24,6 +24,9 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
import org.apache.hudi.sink.append.AppendWriteOperator;
@@ -31,6 +34,7 @@ import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
+import org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
@@ -58,6 +62,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -103,6 +108,11 @@ public class Pipelines {
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType
rowType, DataStream<RowData> dataStream) {
WriteOperatorFactory<RowData> operatorFactory =
BulkInsertWriteOperator.getFactory(conf, rowType);
if (OptionsResolver.isBucketIndexType(conf)) {
+ // TODO support bulk insert for consistent bucket index
+ if (OptionsResolver.getBucketEngineType(conf) ==
HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING) {
+ throw new HoodieException(
+ "Consistent hashing bucket index does not work with bulk insert
using FLINK engine. Use simple bucket index or Spark engine.");
+ }
String indexKeys = OptionsResolver.getIndexKeyField(conf);
int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
@@ -327,14 +337,41 @@ public class Pipelines {
*/
public static DataStream<Object> hoodieStreamWrite(Configuration conf,
DataStream<HoodieRecord> dataStream) {
if (OptionsResolver.isBucketIndexType(conf)) {
- WriteOperatorFactory<HoodieRecord> operatorFactory =
BucketStreamWriteOperator.getFactory(conf);
- int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
- String indexKeyFields = OptionsResolver.getIndexKeyField(conf);
- BucketIndexPartitioner<HoodieKey> partitioner = new
BucketIndexPartitioner<>(bucketNum, indexKeyFields);
- return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
- .transform(opName("bucket_write", conf),
TypeInformation.of(Object.class), operatorFactory)
- .uid(opUID("bucket_write", conf))
- .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ HoodieIndex.BucketIndexEngineType bucketIndexEngineType =
OptionsResolver.getBucketEngineType(conf);
+ switch (bucketIndexEngineType) {
+ case SIMPLE:
+ int bucketNum =
conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+ String indexKeyFields = OptionsResolver.getIndexKeyField(conf);
+ BucketIndexPartitioner<HoodieKey> partitioner = new
BucketIndexPartitioner<>(bucketNum, indexKeyFields);
+ return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
+ .transform(
+ opName("bucket_write", conf),
+ TypeInformation.of(Object.class),
+ BucketStreamWriteOperator.getFactory(conf))
+ .uid(opUID("bucket_write", conf))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ case CONSISTENT_HASHING:
+ if (OptionsResolver.isInsertOverwrite(conf)) {
+ // TODO support insert overwrite for consistent bucket index
+ throw new HoodieException("Consistent hashing bucket index does
not work with insert overwrite using FLINK engine. Use simple bucket index or
Spark engine.");
+ }
+ return dataStream
+ .transform(
+ opName("consistent_bucket_assigner", conf),
+ TypeInformation.of(HoodieRecord.class),
+ new ProcessOperator<>(new
ConsistentBucketAssignFunction(conf)))
+ .uid(opUID("consistent_bucket_assigner", conf))
+
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
+ .keyBy(record -> record.getCurrentLocation().getFileId())
+ .transform(
+ opName("consistent_bucket_write", conf),
+ TypeInformation.of(Object.class),
+ StreamWriteOperator.getFactory(conf))
+ .uid(opUID("consistent_bucket_write", conf))
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+ default:
+ throw new HoodieNotSupportedException("Unknown bucket index engine
type: " + bucketIndexEngineType);
+ }
} else {
WriteOperatorFactory<HoodieRecord> operatorFactory =
StreamWriteOperator.getFactory(conf);
return dataStream
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
new file mode 100644
index 00000000000..2eb6e4b3626
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bucket/ITTestConsistentBucketStreamWrite.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.bucket;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.utils.Pipelines;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.JsonDeserializationFunction;
+import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.utils.FlinkMiniCluster;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+import org.apache.hudi.utils.source.ContinuousFileSource;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.io.FilePathFilter;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Integration test for Flink Hoodie stream sink of consistent bucket index.
+ */
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestConsistentBucketStreamWrite extends TestLogger {
+
+ private static final Map<String, String> EXPECTED = new HashMap<>();
+
+ static {
+ EXPECTED.put("par1", "[id1,par1,id1,Danny,23,1000,par1,
id2,par1,id2,Stephen,33,2000,par1]");
+ EXPECTED.put("par2", "[id3,par2,id3,Julian,53,3000,par2,
id4,par2,id4,Fabian,31,4000,par2]");
+ EXPECTED.put("par3", "[id5,par3,id5,Sophia,18,5000,par3,
id6,par3,id6,Emma,20,6000,par3]");
+ EXPECTED.put("par4", "[id7,par4,id7,Bob,44,7000,par4,
id8,par4,id8,Han,56,8000,par4]");
+ }
+
+ @TempDir
+ File tempFile;
+
+ @Test
+ public void testWriteMOR() throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+ conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE,
"CONSISTENT_HASHING");
+ conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+ conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
+ testWriteToHoodie(conf, "mor_write", 1, EXPECTED);
+ }
+
+ @Test
+ public void testBulkInsert() {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+ conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE,
"CONSISTENT_HASHING");
+ conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+ conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
+ conf.setString(FlinkOptions.OPERATION, "bulk_insert");
+
+ // expect HoodieException for bulk insert
+ assertThrows(
+ HoodieException.class,
+ () -> testWriteToHoodie(conf, "bulk_insert", 1, EXPECTED));
+ }
+
+ @Test
+ public void testOverwrite() {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.toURI().toString());
+ conf.setString(FlinkOptions.INDEX_TYPE, "BUCKET");
+ conf.setString(FlinkOptions.BUCKET_INDEX_ENGINE_TYPE,
"CONSISTENT_HASHING");
+ conf.setInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, 4);
+ conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, 1);
+ conf.setString(FlinkOptions.TABLE_TYPE,
HoodieTableType.MERGE_ON_READ.name());
+ conf.setString(FlinkOptions.OPERATION, "INSERT_OVERWRITE");
+
+ // expect HoodieException for overwrite
+ assertThrows(
+ HoodieException.class,
+ () -> testWriteToHoodie(conf, "overwrite", 1, EXPECTED));
+ }
+
+ private void testWriteToHoodie(
+ Configuration conf,
+ String jobName,
+ int checkpoints,
+ Map<String, String> expected) throws Exception {
+ StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.getConfig().disableObjectReuse();
+ execEnv.setParallelism(4);
+ // set up checkpoint interval
+ execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE);
+ execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+ // Read from file source
+ RowType rowType =
+ (RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
+ .getLogicalType();
+
+ String sourcePath = Objects.requireNonNull(Thread.currentThread()
+ .getContextClassLoader().getResource("test_source.data")).toString();
+
+ boolean isMor =
conf.getString(FlinkOptions.TABLE_TYPE).equals(HoodieTableType.MERGE_ON_READ.name());
+
+ DataStream<RowData> dataStream;
+ if (isMor) {
+ TextInputFormat format = new TextInputFormat(new Path(sourcePath));
+ format.setFilesFilter(FilePathFilter.createDefaultFilter());
+ TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+ format.setCharsetName("UTF-8");
+
+ dataStream = execEnv
+ // use PROCESS_CONTINUOUSLY mode to trigger checkpoint
+ .readFile(format, sourcePath,
FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo)
+ .map(JsonDeserializationFunction.getInstance(rowType))
+ .setParallelism(1);
+ } else {
+ dataStream = execEnv
+ // use continuous file source to trigger checkpoint
+ .addSource(new ContinuousFileSource.BoundedSourceFunction(new
Path(sourcePath), checkpoints))
+ .name("continuous_file_source")
+ .setParallelism(1)
+ .map(JsonDeserializationFunction.getInstance(rowType))
+ .setParallelism(4);
+ }
+
+ OptionsInference.setupSinkTasks(conf, execEnv.getParallelism());
+ DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream);
+ // bulk_insert mode
+ final String writeOperation = conf.get(FlinkOptions.OPERATION);
+ if (WriteOperationType.fromValue(writeOperation) ==
WriteOperationType.BULK_INSERT) {
+ Pipelines.bulkInsert(conf, rowType, dataStream);
+ } else {
+ DataStream<Object> pipeline = Pipelines.hoodieStreamWrite(conf,
hoodieRecordDataStream);
+ execEnv.addOperator(pipeline.getTransformation());
+ }
+ JobClient client = execEnv.executeAsync(jobName);
+ if (client.getJobStatus().get() != JobStatus.FAILED) {
+ try {
+ TimeUnit.SECONDS.sleep(30);
+ client.cancel();
+ } catch (Throwable var1) {
+ // ignored
+ }
+ }
+ FileSystem fs = FSUtils.getFs(tempFile.getAbsolutePath(), new
org.apache.hadoop.conf.Configuration());
+ TestData.checkWrittenDataMOR(fs, tempFile, expected, 4);
+ }
+}