This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3109d890f1 [HUDI-5252] ClusteringCommitSink supports to rollback 
clustering (#7263)
3109d890f1 is described below

commit 3109d890f13b1b29e5796a9f34ab28fa898ec23c
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Nov 23 11:37:53 2022 +0800

    [HUDI-5252] ClusteringCommitSink supports to rollback clustering (#7263)
    
    * [HUDI-5252] ClusteringCommitSink supports to rollback clustering
---
 .../hudi/sink/clustering/ClusteringCommitSink.java |   4 +-
 .../java/org/apache/hudi/util/ClusteringUtil.java  |  17 +++
 .../org/apache/hudi/utils/TestClusteringUtil.java  | 127 +++++++++++++++++++++
 3 files changed, 146 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index 5a46dcf8f3..eb567d89f1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -36,7 +36,7 @@ import org.apache.hudi.exception.HoodieClusteringException;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.ClusteringUtil;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -122,7 +122,7 @@ public class ClusteringCommitSink extends 
CleanFunction<ClusteringCommitEvent> {
     if (events.stream().anyMatch(ClusteringCommitEvent::isFailed)) {
       try {
         // handle failure case
-        CompactionUtil.rollbackCompaction(table, instant);
+        ClusteringUtil.rollbackClustering(table, writeClient, instant);
       } finally {
         // remove commitBuffer to avoid obsolete metadata commit
         reset(instant);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 580dbacc4d..cb1e54b44d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -20,6 +20,7 @@ package org.apache.hudi.util;
 
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -77,4 +78,20 @@ public class ClusteringUtil {
       table.getMetaClient().reloadActiveTimeline();
     });
   }
+
+  /**
+   * Force rolls back the inflight clustering instant, for handling failure 
case.
+   *
+   * @param table The hoodie table
+   * @param writeClient The write client
+   * @param instantTime The instant time
+   */
+  public static void rollbackClustering(HoodieFlinkTable<?> table, 
HoodieFlinkWriteClient<?> writeClient, String instantTime) {
+    HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(instantTime);
+    if 
(table.getMetaClient().reloadActiveTimeline().filterPendingReplaceTimeline().containsInstant(inflightInstant))
 {
+      LOG.warn("Rollback failed clustering instant: [" + instantTime + "]");
+      table.rollbackInflightClustering(inflightInstant,
+          commitToRollback -> 
writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback, 
false));
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
new file mode 100644
index 0000000000..4c21a96235
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.ClusteringUtil;
+import org.apache.hudi.util.FlinkTables;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link ClusteringUtil}.
+ */
+public class TestClusteringUtil {
+
+  private HoodieFlinkTable<?> table;
+  private HoodieTableMetaClient metaClient;
+  private HoodieFlinkWriteClient<?> writeClient;
+  private Configuration conf;
+
+  @TempDir
+  File tempFile;
+
+  void beforeEach() throws IOException {
+    beforeEach(Collections.emptyMap());
+  }
+
+  void beforeEach(Map<String, String> options) throws IOException {
+    this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+    options.forEach((k, v) -> conf.setString(k, v));
+
+    StreamerUtil.initTableIfNotExists(conf);
+
+    this.table = FlinkTables.createTable(conf);
+    this.metaClient = table.getMetaClient();
+    this.writeClient = FlinkWriteClients.createWriteClient(conf);
+  }
+
+  @Test
+  void rollbackClustering() throws Exception {
+    beforeEach();
+    List<String> oriInstants = IntStream.range(0, 3)
+        .mapToObj(i -> generateClusteringPlan()).collect(Collectors.toList());
+    List<HoodieInstant> instants = 
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient())
+        .stream().filter(instant -> instant.getState() == 
HoodieInstant.State.INFLIGHT)
+        .collect(Collectors.toList());
+    assertThat("all the instants should be in pending state", instants.size(), 
is(3));
+    ClusteringUtil.rollbackClustering(table, writeClient);
+    boolean allRolledBack = 
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient())
+        .stream().allMatch(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
+    assertTrue(allRolledBack, "all the instants should be rolled back");
+    List<String> actualInstants = 
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient())
+        
.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+    assertThat(actualInstants, is(oriInstants));
+  }
+
+  /**
+   * Generates a clustering plan on the timeline and returns its instant time.
+   */
+  private String generateClusteringPlan() {
+    HoodieClusteringGroup group = new HoodieClusteringGroup();
+    HoodieClusteringPlan plan = new 
HoodieClusteringPlan(Collections.singletonList(group),
+        HoodieClusteringStrategy.newBuilder().build(), Collections.emptyMap(), 
1, false);
+    HoodieRequestedReplaceMetadata metadata = new 
HoodieRequestedReplaceMetadata(WriteOperationType.CLUSTER.name(),
+        plan, Collections.emptyMap(), 1);
+    String instantTime = HoodieActiveTimeline.createNewInstantTime();
+    HoodieInstant clusteringInstant =
+        new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);
+    try {
+      
metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
+          TimelineMetadataUtils.serializeRequestedReplaceMetadata(metadata));
+      
table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant,
 Option.empty());
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Exception scheduling clustering", ioe);
+    }
+    metaClient.reloadActiveTimeline();
+    return instantTime;
+  }
+}

Reply via email to