This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6cdf59d [HUDI-1354] Block updates and replace on file groups in
clustering (#2275)
6cdf59d is described below
commit 6cdf59d92b1c260abae82bba7d30d8ac280bddbf
Author: lw0090 <[email protected]>
AuthorDate: Mon Dec 28 12:30:29 2020 +0800
[HUDI-1354] Block updates and replace on file groups in clustering (#2275)
* [HUDI-1354] Block updates and replace on file groups in clustering
* [HUDI-1354] Block updates and replace on file groups in clustering
---
.../apache/hudi/config/HoodieClusteringConfig.java | 29 +++++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 14 +++
.../exception/HoodieClusteringUpdateException.java | 29 ++++++
.../action/cluster/strategy/UpdateStrategy.java | 48 ++++++++++
.../update/strategy/SparkRejectUpdateStrategy.java | 67 ++++++++++++++
.../commit/BaseSparkCommitActionExecutor.java | 22 ++++-
.../table/action/commit/UpsertPartitioner.java | 35 ++++++-
.../TestHoodieClientOnCopyOnWriteStorage.java | 101 ++++++++++++++++++++-
8 files changed, 339 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index ae805ca..91acd30 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -73,9 +73,17 @@ public class HoodieClusteringConfig extends
DefaultHoodieConfig {
public static final String CLUSTERING_TARGET_FILE_MAX_BYTES =
CLUSTERING_STRATEGY_PARAM_PREFIX + "target.file.max.bytes";
public static final String DEFAULT_CLUSTERING_TARGET_FILE_MAX_BYTES =
String.valueOf(1 * 1024 * 1024 * 1024L); // 1GB
- // constants related to clustering that may be used by more than 1 strategy.
+ // Constants related to clustering that may be used by more than 1 strategy.
public static final String CLUSTERING_SORT_COLUMNS_PROPERTY =
HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns";
+ // When file groups is in clustering, need to handle the update to these
file groups. Default strategy just reject the update
+ public static final String CLUSTERING_UPDATES_STRATEGY_PROP =
"hoodie.clustering.updates.strategy";
+ public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY =
"org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy";
+
+ // Async clustering
+ public static final String ASYNC_CLUSTERING_ENABLE_OPT_KEY =
"hoodie.clustering.async.enabled";
+ public static final String DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL = "false";
+
public HoodieClusteringConfig(Properties props) {
super(props);
}
@@ -135,8 +143,8 @@ public class HoodieClusteringConfig extends
DefaultHoodieConfig {
return this;
}
- public Builder withInlineClustering(Boolean inlineCompaction) {
- props.setProperty(INLINE_CLUSTERING_PROP,
String.valueOf(inlineCompaction));
+ public Builder withInlineClustering(Boolean inlineClustering) {
+ props.setProperty(INLINE_CLUSTERING_PROP,
String.valueOf(inlineClustering));
return this;
}
@@ -150,8 +158,19 @@ public class HoodieClusteringConfig extends
DefaultHoodieConfig {
return this;
}
+ public Builder withClusteringUpdatesStrategy(String updatesStrategyClass) {
+ props.setProperty(CLUSTERING_UPDATES_STRATEGY_PROP,
updatesStrategyClass);
+ return this;
+ }
+
+ public Builder withAsyncClustering(Boolean asyncClustering) {
+ props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY,
String.valueOf(asyncClustering));
+ return this;
+ }
+
public HoodieClusteringConfig build() {
HoodieClusteringConfig config = new HoodieClusteringConfig(props);
+
setDefaultOnCondition(props,
!props.containsKey(CLUSTERING_PLAN_STRATEGY_CLASS),
CLUSTERING_PLAN_STRATEGY_CLASS,
DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS);
setDefaultOnCondition(props,
!props.containsKey(CLUSTERING_EXECUTION_STRATEGY_CLASS),
@@ -170,6 +189,10 @@ public class HoodieClusteringConfig extends
DefaultHoodieConfig {
DEFAULT_CLUSTERING_TARGET_PARTITIONS);
setDefaultOnCondition(props,
!props.containsKey(CLUSTERING_PLAN_SMALL_FILE_LIMIT),
CLUSTERING_PLAN_SMALL_FILE_LIMIT,
DEFAULT_CLUSTERING_PLAN_SMALL_FILE_LIMIT);
+ setDefaultOnCondition(props,
!props.containsKey(CLUSTERING_UPDATES_STRATEGY_PROP),
CLUSTERING_UPDATES_STRATEGY_PROP,
+ DEFAULT_CLUSTERING_UPDATES_STRATEGY);
+ setDefaultOnCondition(props,
!props.containsKey(ASYNC_CLUSTERING_ENABLE_OPT_KEY),
ASYNC_CLUSTERING_ENABLE_OPT_KEY,
+ DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL);
return config;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 69544e8..18bb4b3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -50,6 +50,7 @@ import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+
/**
* Class storing configs for the HoodieWriteClient.
*/
@@ -395,6 +396,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return
Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_PROP));
}
+ public boolean isAsyncClusteringEnabled() {
+ return
Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
+ }
+
+ public boolean isClusteringEnabled() {
+ // TODO: future support async clustering
+ return isInlineClustering() || isAsyncClusteringEnabled();
+ }
+
public int getInlineClusterMaxCommits() {
return
Integer.parseInt(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP));
}
@@ -415,6 +425,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return
Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED));
}
+ public String getClusteringUpdatesStrategyClass() {
+ return
props.getProperty(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP);
+ }
+
/**
* Clustering properties.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java
new file mode 100644
index 0000000..68b62a5
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieClusteringUpdateException extends HoodieException {
+ public HoodieClusteringUpdateException(String msg) {
+ super(msg);
+ }
+
+ public HoodieClusteringUpdateException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
new file mode 100644
index 0000000..667a58b
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import java.util.Set;
+
+/**
+ * When file groups in clustering, write records to these file group need to
check.
+ */
+public abstract class UpdateStrategy<T extends HoodieRecordPayload<T>, I> {
+
+ protected final HoodieEngineContext engineContext;
+ protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
+
+ protected UpdateStrategy(HoodieEngineContext engineContext,
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
+ this.engineContext = engineContext;
+ this.fileGroupsInPendingClustering = fileGroupsInPendingClustering;
+ }
+
+ /**
+ * Check the update records to the file group in clustering.
+ * @param taggedRecordsRDD the records to write, tagged with target file id,
+ * future can update tagged records location to a
different fileId.
+ * @return the recordsRDD strategy updated
+ */
+ public abstract I handleUpdate(I taggedRecordsRDD);
+
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
new file mode 100644
index 0000000..134e490
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.update.strategy;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieClusteringUpdateException;
+import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * Update strategy based on following.
+ * if some file group have update record, throw exception
+ */
+public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>>
extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
+ private static final Logger LOG =
LogManager.getLogger(SparkRejectUpdateStrategy.class);
+
+ public SparkRejectUpdateStrategy(HoodieSparkEngineContext engineContext,
HashSet<HoodieFileGroupId> fileGroupsInPendingClustering) {
+ super(engineContext, fileGroupsInPendingClustering);
+ }
+
+ private List<HoodieFileGroupId>
getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> inputRecords) {
+ List<HoodieFileGroupId> fileGroupIdsWithUpdates = inputRecords
+ .filter(record -> record.getCurrentLocation() != null)
+ .map(record -> new HoodieFileGroupId(record.getPartitionPath(),
record.getCurrentLocation().getFileId())).distinct().collect();
+ return fileGroupIdsWithUpdates;
+ }
+
+ @Override
+ public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>>
taggedRecordsRDD) {
+ List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate =
getGroupIdsWithUpdate(taggedRecordsRDD);
+ fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
+ if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate))
{
+ String msg = String.format("Not allowed to update the clustering file
group %s. "
+ + "For pending clustering operations, we are not going to
support update for now.",
+ fileGroupIdWithRecordUpdate.toString());
+ LOG.error(msg);
+ throw new HoodieClusteringUpdateException(msg);
+ }
+ });
+ return taggedRecordsRDD;
+ }
+
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 73be8d4..1fd5dad 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -28,11 +28,13 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
@@ -46,6 +48,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.Partitioner;
@@ -59,11 +62,13 @@ import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
+import java.util.stream.Collectors;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.Map;
public abstract class BaseSparkCommitActionExecutor<T extends
HoodieRecordPayload> extends
@@ -88,6 +93,18 @@ public abstract class BaseSparkCommitActionExecutor<T
extends HoodieRecordPayloa
super(context, config, table, instantTime, operationType, extraMetadata);
}
+ private JavaRDD<HoodieRecord<T>>
clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+ if (config.isClusteringEnabled()) {
+ Set<HoodieFileGroupId> fileGroupsInPendingClustering =
+
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry ->
entry.getKey()).collect(Collectors.toSet());
+ UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
+ .loadClass(config.getClusteringUpdatesStrategyClass(), this.context,
fileGroupsInPendingClustering);
+ return
(JavaRDD<HoodieRecord<T>>)updateStrategy.handleUpdate(inputRecordsRDD);
+ } else {
+ return inputRecordsRDD;
+ }
+ }
+
@Override
public HoodieWriteMetadata<JavaRDD<WriteStatus>>
execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new
HoodieWriteMetadata<>();
@@ -107,9 +124,12 @@ public abstract class BaseSparkCommitActionExecutor<T
extends HoodieRecordPayloa
saveWorkloadProfileMetadataToInflight(profile, instantTime);
}
+ // handle records update with clustering
+ JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate =
clusteringHandleUpdate(inputRecordsRDD);
+
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(profile);
- JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDD,
partitioner);
+ JavaRDD<HoodieRecord<T>> partitionedRecords =
partition(inputRecordsRDDWithClusteringUpdate, partitioner);
JavaRDD<WriteStatus> writeStatusRDD =
partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
if (WriteOperationType.isChangingRecords(operationType)) {
return handleUpsertPartition(instantTime, partition, recordItr,
partitioner);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index b28c89a..a84e912 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -129,6 +129,34 @@ public class UpsertPartitioner<T extends
HoodieRecordPayload<T>> extends Partiti
return bucket;
}
+ /**
+ * Get the in pending clustering fileId for each partition path.
+ * @return partition path to pending clustering file groups id
+ */
+ private Map<String, Set<String>>
getPartitionPathToPendingClusteringFileGroupsId() {
+ Map<String, Set<String>> partitionPathToInPendingClusteringFileId =
+ table.getFileSystemView().getFileGroupsInPendingClustering()
+ .map(fileGroupIdAndInstantPair ->
+ Pair.of(fileGroupIdAndInstantPair.getKey().getPartitionPath(),
fileGroupIdAndInstantPair.getKey().getFileId()))
+ .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toSet())));
+ return partitionPathToInPendingClusteringFileId;
+ }
+
+ /**
+ * Exclude small file handling for clustering since update path is not
supported.
+ * @param pendingClusteringFileGroupsId pending clustering file groups id
of partition
+ * @param smallFiles small files of partition
+ * @return smallFiles not in clustering
+ */
+ private List<SmallFile> filterSmallFilesInClustering(final Set<String>
pendingClusteringFileGroupsId, final List<SmallFile> smallFiles) {
+ if (this.config.isClusteringEnabled()) {
+ return smallFiles.stream()
+ .filter(smallFile ->
!pendingClusteringFileGroupsId.contains(smallFile.location.getFileId())).collect(Collectors.toList());
+ } else {
+ return smallFiles;
+ }
+ }
+
private void assignInserts(WorkloadProfile profile, HoodieEngineContext
context) {
// for new inserts, compute buckets depending on how many records we have
for each partition
Set<String> partitionPaths = profile.getPartitionPaths();
@@ -140,11 +168,16 @@ public class UpsertPartitioner<T extends
HoodieRecordPayload<T>> extends Partiti
Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths),
context);
+ Map<String, Set<String>> partitionPathToPendingClusteringFileGroupsId =
getPartitionPathToPendingClusteringFileGroupsId();
+
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) {
- List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
+ List<SmallFile> smallFiles =
+
filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath,
Collections.emptySet()),
+ partitionSmallFilesMap.get(partitionPath));
+
this.smallFiles.addAll(smallFiles);
LOG.info("For partitionPath : " + partitionPath + " Small Files => " +
smallFiles);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index d9a396d..c201efd 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -21,6 +21,8 @@ package org.apache.hudi.client;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -30,15 +32,19 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
@@ -52,10 +58,12 @@ import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -78,6 +86,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -86,6 +95,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.Properties;
import static
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
import static
org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
@@ -97,6 +107,8 @@ import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAM
import static
org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
import static
org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet;
import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
+import static
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY;
+import static
org.apache.hudi.config.HoodieClusteringConfig.DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -110,6 +122,12 @@ import static org.mockito.Mockito.when;
public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase
{
private static final Logger LOG =
LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
+ private static final Map<String, String> STRATEGY_PARAMS = new
HashMap<String, String>() {
+ {
+ put("sortColumn", "record_key");
+ }
+ };
+
private HoodieTestTable testTable;
@BeforeEach
@@ -681,6 +699,70 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
}
+ private Pair<List<WriteStatus>, List<HoodieRecord>>
insertBatchRecords(SparkRDDWriteClient client, String commitTime,
+
Integer recordNum, int expectStatueSize) {
+ client.startCommitWithTime(commitTime);
+ List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime,
recordNum);
+ JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
+ List<WriteStatus> statuses = client.upsert(insertRecordsRDD1,
commitTime).collect();
+ assertNoWriteErrors(statuses);
+ assertEquals(expectStatueSize, statuses.size(), "check expect statue
size.");
+ return Pair.of(statuses, inserts1);
+ }
+
+ @Test
+ public void testUpdateRejectForClustering() throws IOException {
+ final String testPartitionPath = "2016/09/26";
+ dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+ Properties props = new Properties();
+ props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY, "true");
+ HoodieWriteConfig config = getSmallInsertWriteConfig(100,
+ TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), props);
+ SparkRDDWriteClient client = getHoodieWriteClient(config, false);
+ HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable)
HoodieSparkTable.create(config, context, metaClient);
+
+ //1. insert to generate 2 file group
+ String commitTime1 = "001";
+ Pair<List<WriteStatus>, List<HoodieRecord>> upsertResult =
insertBatchRecords(client, commitTime1, 600, 2);
+ List<HoodieRecord> inserts1 = upsertResult.getValue();
+ List<String> fileGroupIds1 =
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+ .map(fileGroup ->
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+ assertEquals(2, fileGroupIds1.size());
+
+ // 2. generate clustering plan for fileGroupIds1 file groups
+ String commitTime2 = "002";
+ List<List<FileSlice>> firstInsertFileSlicesList =
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+ .map(fileGroup ->
fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList());
+ List<FileSlice>[] fileSlices =
(List<FileSlice>[])firstInsertFileSlicesList.toArray(new
List[firstInsertFileSlicesList.size()]);
+ createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices);
+
+ // 3. insert one record with no updating reject exception, and not merge
the small file, just generate a new file group
+ String commitTime3 = "003";
+ insertBatchRecords(client, commitTime3, 1, 1).getKey();
+ List<String> fileGroupIds2 =
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+ .map(fileGroup ->
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+ assertEquals(3, fileGroupIds2.size());
+
+ // 4. update one record for the clustering two file groups, throw reject
update exception
+ String commitTime4 = "004";
+ client.startCommitWithTime(commitTime4);
+ List<HoodieRecord> insertsAndUpdates3 = new ArrayList<>();
+ insertsAndUpdates3.addAll(dataGen.generateUpdates(commitTime4, inserts1));
+ String assertMsg = String.format("Not allowed to update the clustering
files in partition: %s "
+ + "For pending clustering operations, we are not going to support
update for now.", testPartitionPath);
+ assertThrows(HoodieUpsertException.class, () -> {
+ writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1),
commitTime3).collect(); }, assertMsg);
+
+ // 5. insert one record with no updating reject exception, will merge the
small file
+ String commitTime5 = "005";
+ List<WriteStatus> statuses = insertBatchRecords(client, commitTime5, 1,
1).getKey();
+ fileGroupIds2.removeAll(fileGroupIds1);
+ assertEquals(fileGroupIds2.get(0), statuses.get(0).getFileId());
+ List<String> firstInsertFileGroupIds4 =
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+ .map(fileGroup ->
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+ assertEquals(3, firstInsertFileGroupIds4.size());
+ }
+
/**
* Test scenario of new file-group getting added during upsert().
*/
@@ -1467,8 +1549,12 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA;
return getSmallInsertWriteConfig(insertSplitSize, schemaStr,
smallFileSize);
}
-
+
private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize,
String schemaStr, long smallFileSize) {
+ return getSmallInsertWriteConfig(insertSplitSize, schemaStr,
smallFileSize, new Properties());
+ }
+
+ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize,
String schemaStr, long smallFileSize, Properties props) {
HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr);
return builder
.withCompactionConfig(
@@ -1479,6 +1565,19 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
.parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
+ .withProps(props)
.build();
}
+
+ protected HoodieInstant createRequestedReplaceInstant(HoodieTableMetaClient
metaClient, String clusterTime, List<FileSlice>[] fileSlices) throws
IOException {
+ HoodieClusteringPlan clusteringPlan =
+
ClusteringUtils.createClusteringPlan(DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS,
STRATEGY_PARAMS, fileSlices, Collections.emptyMap());
+
+ HoodieInstant clusteringInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime);
+ HoodieRequestedReplaceMetadata requestedReplaceMetadata =
HoodieRequestedReplaceMetadata.newBuilder()
+
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
+
metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+ return clusteringInstant;
+ }
+
}