This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.10.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b78a9dc7096af513d064fda94d7e13d206ea50db Author: Sagar Sumit <[email protected]> AuthorDate: Wed Jan 5 03:02:05 2022 +0530 [HUDI-2774] Handle duplicate instants when fetching pending clustering plans (#4118) --- .../org/apache/hudi/common/util/ClusteringUtils.java | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index 0d790be..6687e58 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -124,7 +125,16 @@ public class ClusteringUtils { // get all filegroups in the plan getFileGroupEntriesInClusteringPlan(clusteringPlan.getLeft(), clusteringPlan.getRight())); - Map<HoodieFileGroupId, HoodieInstant> resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map<HoodieFileGroupId, HoodieInstant> resultMap; + try { + resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } catch (Exception e) { + if (e instanceof IllegalStateException && e.getMessage().contains("Duplicate key")) { + throw new HoodieException("Found duplicate file groups pending clustering. If you're running deltastreamer in continuous mode, consider adding delay using --min-sync-interval-seconds. " + + "Or consider setting write concurrency mode to optimistic_concurrency_control.", e); + } + throw new HoodieException("Error getting all file groups in pending clustering", e); + } LOG.info("Found " + resultMap.size() + " files in pending clustering operations"); return resultMap; } @@ -166,22 +176,20 @@ public class ClusteringUtils { .setStrategyClassName(strategyClassName).setStrategyParams(strategyParams) .build(); - HoodieClusteringPlan plan = HoodieClusteringPlan.newBuilder() + return HoodieClusteringPlan.newBuilder() .setInputGroups(clusteringGroups) .setExtraMetadata(extraMetadata) .setStrategy(strategy) .build(); - - return plan; } private static List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> slices) { - return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder() + return slices.stream().map(slice -> HoodieSliceInfo.newBuilder() .setPartitionPath(slice.getPartitionPath()) .setFileId(slice.getFileId()) .setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(null)) .setDeltaFilePaths(slice.getLogFiles().map(f -> f.getPath().getName()).collect(Collectors.toList())) - .setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(bbf -> bbf.getPath()).orElse(null)).orElse(null)) + .setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null)).orElse(null)) .build()).collect(Collectors.toList()); }
