This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cdf59d  [HUDI-1354] Block updates and replace on file groups in 
clustering (#2275)
6cdf59d is described below

commit 6cdf59d92b1c260abae82bba7d30d8ac280bddbf
Author: lw0090 <[email protected]>
AuthorDate: Mon Dec 28 12:30:29 2020 +0800

    [HUDI-1354] Block updates and replace on file groups in clustering (#2275)
    
    * [HUDI-1354] Block updates and replace on file groups in clustering
    
    * [HUDI-1354]  Block updates and replace on file groups in clustering
---
 .../apache/hudi/config/HoodieClusteringConfig.java |  29 +++++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  14 +++
 .../exception/HoodieClusteringUpdateException.java |  29 ++++++
 .../action/cluster/strategy/UpdateStrategy.java    |  48 ++++++++++
 .../update/strategy/SparkRejectUpdateStrategy.java |  67 ++++++++++++++
 .../commit/BaseSparkCommitActionExecutor.java      |  22 ++++-
 .../table/action/commit/UpsertPartitioner.java     |  35 ++++++-
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 101 ++++++++++++++++++++-
 8 files changed, 339 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
index ae805ca..91acd30 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
@@ -73,9 +73,17 @@ public class HoodieClusteringConfig extends 
DefaultHoodieConfig {
   public static final String CLUSTERING_TARGET_FILE_MAX_BYTES = 
CLUSTERING_STRATEGY_PARAM_PREFIX + "target.file.max.bytes";
   public static final String DEFAULT_CLUSTERING_TARGET_FILE_MAX_BYTES = 
String.valueOf(1 * 1024 * 1024 * 1024L); // 1GB
   
-  // constants related to clustering that may be used by more than 1 strategy.
+  // Constants related to clustering that may be used by more than 1 strategy.
   public static final String CLUSTERING_SORT_COLUMNS_PROPERTY = 
HoodieClusteringConfig.CLUSTERING_STRATEGY_PARAM_PREFIX + "sort.columns";
 
+  // When file groups is in clustering, need to handle the update to these 
file groups. Default strategy just reject the update
+  public static final String CLUSTERING_UPDATES_STRATEGY_PROP = 
"hoodie.clustering.updates.strategy";
+  public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY = 
"org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy";
+
+  // Async clustering
+  public static final String ASYNC_CLUSTERING_ENABLE_OPT_KEY = 
"hoodie.clustering.async.enabled";
+  public static final String DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL = "false";
+
   public HoodieClusteringConfig(Properties props) {
     super(props);
   }
@@ -135,8 +143,8 @@ public class HoodieClusteringConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
-    public Builder withInlineClustering(Boolean inlineCompaction) {
-      props.setProperty(INLINE_CLUSTERING_PROP, 
String.valueOf(inlineCompaction));
+    public Builder withInlineClustering(Boolean inlineClustering) {
+      props.setProperty(INLINE_CLUSTERING_PROP, 
String.valueOf(inlineClustering));
       return this;
     }
 
@@ -150,8 +158,19 @@ public class HoodieClusteringConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withClusteringUpdatesStrategy(String updatesStrategyClass) {
+      props.setProperty(CLUSTERING_UPDATES_STRATEGY_PROP, 
updatesStrategyClass);
+      return this;
+    }
+
+    public Builder withAsyncClustering(Boolean asyncClustering) {
+      props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY, 
String.valueOf(asyncClustering));
+      return this;
+    }
+
     public HoodieClusteringConfig build() {
       HoodieClusteringConfig config = new HoodieClusteringConfig(props);
+
       setDefaultOnCondition(props, 
!props.containsKey(CLUSTERING_PLAN_STRATEGY_CLASS),
           CLUSTERING_PLAN_STRATEGY_CLASS, 
DEFAULT_CLUSTERING_PLAN_STRATEGY_CLASS);
       setDefaultOnCondition(props, 
!props.containsKey(CLUSTERING_EXECUTION_STRATEGY_CLASS),
@@ -170,6 +189,10 @@ public class HoodieClusteringConfig extends 
DefaultHoodieConfig {
           DEFAULT_CLUSTERING_TARGET_PARTITIONS);
       setDefaultOnCondition(props, 
!props.containsKey(CLUSTERING_PLAN_SMALL_FILE_LIMIT), 
CLUSTERING_PLAN_SMALL_FILE_LIMIT,
           DEFAULT_CLUSTERING_PLAN_SMALL_FILE_LIMIT);
+      setDefaultOnCondition(props, 
!props.containsKey(CLUSTERING_UPDATES_STRATEGY_PROP), 
CLUSTERING_UPDATES_STRATEGY_PROP, 
+          DEFAULT_CLUSTERING_UPDATES_STRATEGY);
+      setDefaultOnCondition(props, 
!props.containsKey(ASYNC_CLUSTERING_ENABLE_OPT_KEY), 
ASYNC_CLUSTERING_ENABLE_OPT_KEY,
+          DEFAULT_ASYNC_CLUSTERING_ENABLE_OPT_VAL);
       return config;
     }
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 69544e8..18bb4b3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -50,6 +50,7 @@ import java.util.Properties;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+
 /**
  * Class storing configs for the HoodieWriteClient.
  */
@@ -395,6 +396,15 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return 
Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_PROP));
   }
 
+  public boolean isAsyncClusteringEnabled() {
+    return 
Boolean.parseBoolean(props.getProperty(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY));
+  }
+
+  public boolean isClusteringEnabled() {
+    // TODO: future support async clustering
+    return isInlineClustering() || isAsyncClusteringEnabled();
+  }
+
   public int getInlineClusterMaxCommits() {
     return 
Integer.parseInt(props.getProperty(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP));
   }
@@ -415,6 +425,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return 
Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED));
   }
 
+  public String getClusteringUpdatesStrategyClass() {
+    return 
props.getProperty(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP);
+  }
+
   /**
    * Clustering properties.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java
new file mode 100644
index 0000000..68b62a5
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieClusteringUpdateException extends HoodieException {
+  public HoodieClusteringUpdateException(String msg) {
+    super(msg);
+  }
+
+  public HoodieClusteringUpdateException(String msg, Throwable e) {
+    super(msg, e);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
new file mode 100644
index 0000000..667a58b
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/UpdateStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.cluster.strategy;
+
+import org.apache.hudi.client.common.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import java.util.Set;
+
+/**
+ * When file groups in clustering, write records to these file group need to 
check.
+ */
+public abstract class UpdateStrategy<T extends HoodieRecordPayload<T>, I> {
+
+  protected final HoodieEngineContext engineContext;
+  protected Set<HoodieFileGroupId> fileGroupsInPendingClustering;
+
+  protected UpdateStrategy(HoodieEngineContext engineContext, 
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
+    this.engineContext = engineContext;
+    this.fileGroupsInPendingClustering = fileGroupsInPendingClustering;
+  }
+
+  /**
+   * Check the update records to the file group in clustering.
+   * @param taggedRecordsRDD the records to write, tagged with target file id,
+   *                         future can update tagged records location to a 
different fileId.
+   * @return the recordsRDD strategy updated
+   */
+  public abstract I handleUpdate(I taggedRecordsRDD);
+
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
new file mode 100644
index 0000000..134e490
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/update/strategy/SparkRejectUpdateStrategy.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.clustering.update.strategy;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieClusteringUpdateException;
+import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * Update strategy based on following.
+ * if some file group have update record, throw exception
+ */
+public class SparkRejectUpdateStrategy<T extends HoodieRecordPayload<T>> 
extends UpdateStrategy<T, JavaRDD<HoodieRecord<T>>> {
+  private static final Logger LOG = 
LogManager.getLogger(SparkRejectUpdateStrategy.class);
+
+  public SparkRejectUpdateStrategy(HoodieSparkEngineContext engineContext, 
HashSet<HoodieFileGroupId> fileGroupsInPendingClustering) {
+    super(engineContext, fileGroupsInPendingClustering);
+  }
+
+  private List<HoodieFileGroupId> 
getGroupIdsWithUpdate(JavaRDD<HoodieRecord<T>> inputRecords) {
+    List<HoodieFileGroupId> fileGroupIdsWithUpdates = inputRecords
+        .filter(record -> record.getCurrentLocation() != null)
+        .map(record -> new HoodieFileGroupId(record.getPartitionPath(), 
record.getCurrentLocation().getFileId())).distinct().collect();
+    return fileGroupIdsWithUpdates;
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> handleUpdate(JavaRDD<HoodieRecord<T>> 
taggedRecordsRDD) {
+    List<HoodieFileGroupId> fileGroupIdsWithRecordUpdate = 
getGroupIdsWithUpdate(taggedRecordsRDD);
+    fileGroupIdsWithRecordUpdate.forEach(fileGroupIdWithRecordUpdate -> {
+      if (fileGroupsInPendingClustering.contains(fileGroupIdWithRecordUpdate)) 
{
+        String msg = String.format("Not allowed to update the clustering file 
group %s. "
+                + "For pending clustering operations, we are not going to 
support update for now.",
+            fileGroupIdWithRecordUpdate.toString());
+        LOG.error(msg);
+        throw new HoodieClusteringUpdateException(msg);
+      }
+    });
+    return taggedRecordsRDD;
+  }
+
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 73be8d4..1fd5dad 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -28,11 +28,13 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
@@ -46,6 +48,7 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.cluster.strategy.UpdateStrategy;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -59,11 +62,13 @@ import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.stream.Collectors;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.Map;
 
 public abstract class BaseSparkCommitActionExecutor<T extends 
HoodieRecordPayload> extends
@@ -88,6 +93,18 @@ public abstract class BaseSparkCommitActionExecutor<T 
extends HoodieRecordPayloa
     super(context, config, table, instantTime, operationType, extraMetadata);
   }
 
+  private JavaRDD<HoodieRecord<T>> 
clusteringHandleUpdate(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
+    if (config.isClusteringEnabled()) {
+      Set<HoodieFileGroupId> fileGroupsInPendingClustering =
+          
table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> 
entry.getKey()).collect(Collectors.toSet());
+      UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils
+          .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, 
fileGroupsInPendingClustering);
+      return 
(JavaRDD<HoodieRecord<T>>)updateStrategy.handleUpdate(inputRecordsRDD);
+    } else {
+      return inputRecordsRDD;
+    }
+  }
+
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> 
execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = new 
HoodieWriteMetadata<>();
@@ -107,9 +124,12 @@ public abstract class BaseSparkCommitActionExecutor<T 
extends HoodieRecordPayloa
       saveWorkloadProfileMetadataToInflight(profile, instantTime);
     }
 
+    // handle records update with clustering
+    JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate = 
clusteringHandleUpdate(inputRecordsRDD);
+
     // partition using the insert partitioner
     final Partitioner partitioner = getPartitioner(profile);
-    JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDD, 
partitioner);
+    JavaRDD<HoodieRecord<T>> partitionedRecords = 
partition(inputRecordsRDDWithClusteringUpdate, partitioner);
     JavaRDD<WriteStatus> writeStatusRDD = 
partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {
       if (WriteOperationType.isChangingRecords(operationType)) {
         return handleUpsertPartition(instantTime, partition, recordItr, 
partitioner);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index b28c89a..a84e912 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -129,6 +129,34 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends Partiti
     return bucket;
   }
 
+  /**
+   * Get the in pending clustering fileId for each partition path.
+   * @return partition path to pending clustering file groups id
+   */
+  private Map<String, Set<String>> 
getPartitionPathToPendingClusteringFileGroupsId() {
+    Map<String, Set<String>>  partitionPathToInPendingClusteringFileId =
+        table.getFileSystemView().getFileGroupsInPendingClustering()
+            .map(fileGroupIdAndInstantPair ->
+                Pair.of(fileGroupIdAndInstantPair.getKey().getPartitionPath(), 
fileGroupIdAndInstantPair.getKey().getFileId()))
+            .collect(Collectors.groupingBy(Pair::getKey, 
Collectors.mapping(Pair::getValue, Collectors.toSet())));
+    return partitionPathToInPendingClusteringFileId;
+  }
+
+  /**
+   * Exclude small file handling for clustering since update path is not 
supported.
+   * @param pendingClusteringFileGroupsId  pending clustering file groups id 
of partition
+   * @param smallFiles small files of partition
+   * @return smallFiles not in clustering
+   */
+  private List<SmallFile> filterSmallFilesInClustering(final Set<String> 
pendingClusteringFileGroupsId, final List<SmallFile> smallFiles) {
+    if (this.config.isClusteringEnabled()) {
+      return smallFiles.stream()
+          .filter(smallFile -> 
!pendingClusteringFileGroupsId.contains(smallFile.location.getFileId())).collect(Collectors.toList());
+    } else {
+      return smallFiles;
+    }
+  }
+
   private void assignInserts(WorkloadProfile profile, HoodieEngineContext 
context) {
     // for new inserts, compute buckets depending on how many records we have 
for each partition
     Set<String> partitionPaths = profile.getPartitionPaths();
@@ -140,11 +168,16 @@ public class UpsertPartitioner<T extends 
HoodieRecordPayload<T>> extends Partiti
     Map<String, List<SmallFile>> partitionSmallFilesMap =
         getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), 
context);
 
+    Map<String, Set<String>> partitionPathToPendingClusteringFileGroupsId = 
getPartitionPathToPendingClusteringFileGroupsId();
+
     for (String partitionPath : partitionPaths) {
       WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
       if (pStat.getNumInserts() > 0) {
 
-        List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
+        List<SmallFile> smallFiles =
+            
filterSmallFilesInClustering(partitionPathToPendingClusteringFileGroupsId.getOrDefault(partitionPath,
 Collections.emptySet()),
+                partitionSmallFilesMap.get(partitionPath));
+
         this.smallFiles.addAll(smallFiles);
 
         LOG.info("For partitionPath : " + partitionPath + " Small Files => " + 
smallFiles);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index d9a396d..c201efd 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -21,6 +21,8 @@ package org.apache.hudi.client;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -30,15 +32,19 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetUtils;
@@ -52,10 +58,12 @@ import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieCorruptedDataException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieRollbackException;
+import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndex.IndexType;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieSparkCopyOnWriteTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -78,6 +86,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -86,6 +95,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.Properties;
 
 import static 
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_0;
 import static 
org.apache.hudi.common.testutils.FileCreateUtils.getBaseFileCountsForPaths;
@@ -97,6 +107,8 @@ import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAM
 import static 
org.apache.hudi.common.testutils.Transformations.randomSelectAsHoodieKeys;
 import static 
org.apache.hudi.common.testutils.Transformations.recordsToRecordKeySet;
 import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
+import static 
org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE_OPT_KEY;
+import static 
org.apache.hudi.config.HoodieClusteringConfig.DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -110,6 +122,12 @@ import static org.mockito.Mockito.when;
 public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase 
{
 
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class);
+  private static final Map<String, String> STRATEGY_PARAMS = new 
HashMap<String, String>() {
+    {
+      put("sortColumn", "record_key");
+    }
+  };
+
   private HoodieTestTable testTable;
 
   @BeforeEach
@@ -681,6 +699,70 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     }
   }
 
+  private Pair<List<WriteStatus>, List<HoodieRecord>> 
insertBatchRecords(SparkRDDWriteClient client, String commitTime,
+                                                                         
Integer recordNum, int expectStatueSize) {
+    client.startCommitWithTime(commitTime);
+    List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime, 
recordNum);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, 
commitTime).collect();
+    assertNoWriteErrors(statuses);
+    assertEquals(expectStatueSize, statuses.size(), "check expect statue 
size.");
+    return Pair.of(statuses, inserts1);
+  }
+
+  @Test
+  public void testUpdateRejectForClustering() throws IOException {
+    final String testPartitionPath = "2016/09/26";
+    dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+    Properties props = new Properties();
+    props.setProperty(ASYNC_CLUSTERING_ENABLE_OPT_KEY, "true");
+    HoodieWriteConfig config = getSmallInsertWriteConfig(100,
+        TRIP_EXAMPLE_SCHEMA, dataGen.getEstimatedFileSizeInBytes(150), props);
+    SparkRDDWriteClient client = getHoodieWriteClient(config, false);
+    HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkTable.create(config, context, metaClient);
+
+    //1. insert to generate 2 file group
+    String commitTime1 = "001";
+    Pair<List<WriteStatus>, List<HoodieRecord>> upsertResult = 
insertBatchRecords(client, commitTime1, 600, 2);
+    List<HoodieRecord> inserts1 = upsertResult.getValue();
+    List<String> fileGroupIds1 = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+    assertEquals(2, fileGroupIds1.size());
+
+    // 2. generate clustering plan for fileGroupIds1 file groups
+    String commitTime2 = "002";
+    List<List<FileSlice>> firstInsertFileSlicesList = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+        .map(fileGroup -> 
fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList());
+    List<FileSlice>[] fileSlices = 
(List<FileSlice>[])firstInsertFileSlicesList.toArray(new 
List[firstInsertFileSlicesList.size()]);
+    createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices);
+
+    // 3. insert one record with no updating reject exception, and not merge 
the small file, just generate a new file group
+    String commitTime3 = "003";
+    insertBatchRecords(client, commitTime3, 1, 1).getKey();
+    List<String> fileGroupIds2 = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+    assertEquals(3, fileGroupIds2.size());
+
+    // 4. update one record for the clustering two file groups, throw reject 
update exception
+    String commitTime4 = "004";
+    client.startCommitWithTime(commitTime4);
+    List<HoodieRecord> insertsAndUpdates3 = new ArrayList<>();
+    insertsAndUpdates3.addAll(dataGen.generateUpdates(commitTime4, inserts1));
+    String assertMsg = String.format("Not allowed to update the clustering 
files in partition: %s "
+        + "For pending clustering operations, we are not going to support 
update for now.", testPartitionPath);
+    assertThrows(HoodieUpsertException.class, () -> {
+      writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1), 
commitTime3).collect(); }, assertMsg);
+
+    // 5. insert one record with no updating reject exception, will merge the 
small file
+    String commitTime5 = "005";
+    List<WriteStatus> statuses = insertBatchRecords(client, commitTime5, 1, 
1).getKey();
+    fileGroupIds2.removeAll(fileGroupIds1);
+    assertEquals(fileGroupIds2.get(0), statuses.get(0).getFileId());
+    List<String> firstInsertFileGroupIds4 = 
table.getFileSystemView().getAllFileGroups(testPartitionPath)
+        .map(fileGroup -> 
fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList());
+    assertEquals(3, firstInsertFileGroupIds4.size());
+  }
+
   /**
    * Test scenario of new file-group getting added during upsert().
    */
@@ -1467,8 +1549,12 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
     String schemaStr = useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA;
     return getSmallInsertWriteConfig(insertSplitSize, schemaStr, 
smallFileSize);
   }
-  
+
   private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, 
String schemaStr, long smallFileSize) {
+    return getSmallInsertWriteConfig(insertSplitSize, schemaStr, 
smallFileSize, new Properties());
+  }
+  
+  private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, 
String schemaStr, long smallFileSize, Properties props) {
     HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr);
     return builder
         .withCompactionConfig(
@@ -1479,6 +1565,19 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
             HoodieStorageConfig.newBuilder()
                 .hfileMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200))
                 
.parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build())
+        .withProps(props)
         .build();
   }
+
+  protected HoodieInstant createRequestedReplaceInstant(HoodieTableMetaClient 
metaClient, String clusterTime, List<FileSlice>[] fileSlices) throws 
IOException {
+    HoodieClusteringPlan clusteringPlan =
+        
ClusteringUtils.createClusteringPlan(DEFAULT_CLUSTERING_EXECUTION_STRATEGY_CLASS,
 STRATEGY_PARAMS, fileSlices, Collections.emptyMap());
+
+    HoodieInstant clusteringInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime);
+    HoodieRequestedReplaceMetadata requestedReplaceMetadata = 
HoodieRequestedReplaceMetadata.newBuilder()
+        
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
+    
metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, 
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
+    return clusteringInstant;
+  }
+
 }

Reply via email to