This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.10.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b78a9dc7096af513d064fda94d7e13d206ea50db
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Jan 5 03:02:05 2022 +0530

    [HUDI-2774] Handle duplicate instants when fetching pending clustering 
plans (#4118)
---
 .../org/apache/hudi/common/util/ClusteringUtils.java | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
index 0d790be..6687e58 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -124,7 +125,16 @@ public class ClusteringUtils {
         // get all filegroups in the plan
         getFileGroupEntriesInClusteringPlan(clusteringPlan.getLeft(), 
clusteringPlan.getRight()));
 
-    Map<HoodieFileGroupId, HoodieInstant> resultMap = 
resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    Map<HoodieFileGroupId, HoodieInstant> resultMap;
+    try {
+      resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    } catch (Exception e) {
+      if (e instanceof IllegalStateException && 
e.getMessage().contains("Duplicate key")) {
+        throw new HoodieException("Found duplicate file groups pending 
clustering. If you're running deltastreamer in continuous mode, consider adding 
delay using --min-sync-interval-seconds. "
+            + "Or consider setting write concurrency mode to 
optimistic_concurrency_control.", e);
+      }
+      throw new HoodieException("Error getting all file groups in pending 
clustering", e);
+    }
     LOG.info("Found " + resultMap.size() + " files in pending clustering 
operations");
     return resultMap;
   }
@@ -166,22 +176,20 @@ public class ClusteringUtils {
         
.setStrategyClassName(strategyClassName).setStrategyParams(strategyParams)
         .build();
 
-    HoodieClusteringPlan plan = HoodieClusteringPlan.newBuilder()
+    return HoodieClusteringPlan.newBuilder()
         .setInputGroups(clusteringGroups)
         .setExtraMetadata(extraMetadata)
         .setStrategy(strategy)
         .build();
-
-    return plan;
   }
 
   private static List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> 
slices) {
-    return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder()
+    return slices.stream().map(slice -> HoodieSliceInfo.newBuilder()
         .setPartitionPath(slice.getPartitionPath())
         .setFileId(slice.getFileId())
         
.setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(null))
         .setDeltaFilePaths(slice.getLogFiles().map(f -> 
f.getPath().getName()).collect(Collectors.toList()))
-        .setBootstrapFilePath(slice.getBaseFile().map(bf -> 
bf.getBootstrapBaseFile().map(bbf -> bbf.getPath()).orElse(null)).orElse(null))
+        .setBootstrapFilePath(slice.getBaseFile().map(bf -> 
bf.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null)).orElse(null))
         .build()).collect(Collectors.toList());
   }
 

Reply via email to