This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3109d890f1 [HUDI-5252] ClusteringCommitSink supports to rollback
clustering (#7263)
3109d890f1 is described below
commit 3109d890f13b1b29e5796a9f34ab28fa898ec23c
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Nov 23 11:37:53 2022 +0800
[HUDI-5252] ClusteringCommitSink supports to rollback clustering (#7263)
* [HUDI-5252] ClusteringCommitSink supports to rollback clustering
---
.../hudi/sink/clustering/ClusteringCommitSink.java | 4 +-
.../java/org/apache/hudi/util/ClusteringUtil.java | 17 +++
.../org/apache/hudi/utils/TestClusteringUtil.java | 127 +++++++++++++++++++++
3 files changed, 146 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
index 5a46dcf8f3..eb567d89f1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java
@@ -36,7 +36,7 @@ import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import org.apache.hudi.util.CompactionUtil;
+import org.apache.hudi.util.ClusteringUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
@@ -122,7 +122,7 @@ public class ClusteringCommitSink extends
CleanFunction<ClusteringCommitEvent> {
if (events.stream().anyMatch(ClusteringCommitEvent::isFailed)) {
try {
// handle failure case
- CompactionUtil.rollbackCompaction(table, instant);
+ ClusteringUtil.rollbackClustering(table, writeClient, instant);
} finally {
// remove commitBuffer to avoid obsolete metadata commit
reset(instant);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
index 580dbacc4d..cb1e54b44d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/ClusteringUtil.java
@@ -20,6 +20,7 @@ package org.apache.hudi.util;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.OptionsResolver;
@@ -77,4 +78,20 @@ public class ClusteringUtil {
table.getMetaClient().reloadActiveTimeline();
});
}
+
+ /**
+ * Force rolls back the inflight clustering instant, for handling failure
case.
+ *
+ * @param table The hoodie table
+ * @param writeClient The write client
+ * @param instantTime The instant time
+ */
+ public static void rollbackClustering(HoodieFlinkTable<?> table,
HoodieFlinkWriteClient<?> writeClient, String instantTime) {
+ HoodieInstant inflightInstant =
HoodieTimeline.getReplaceCommitInflightInstant(instantTime);
+ if
(table.getMetaClient().reloadActiveTimeline().filterPendingReplaceTimeline().containsInstant(inflightInstant))
{
+ LOG.warn("Rollback failed clustering instant: [" + instantTime + "]");
+ table.rollbackInflightClustering(inflightInstant,
+ commitToRollback ->
writeClient.getPendingRollbackInfo(table.getMetaClient(), commitToRollback,
false));
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
new file mode 100644
index 0000000000..4c21a96235
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestClusteringUtil.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utils;
+
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.table.HoodieFlinkTable;
+import org.apache.hudi.util.ClusteringUtil;
+import org.apache.hudi.util.FlinkTables;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.flink.configuration.Configuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link ClusteringUtil}.
+ */
+public class TestClusteringUtil {
+
+ private HoodieFlinkTable<?> table;
+ private HoodieTableMetaClient metaClient;
+ private HoodieFlinkWriteClient<?> writeClient;
+ private Configuration conf;
+
+ @TempDir
+ File tempFile;
+
+ void beforeEach() throws IOException {
+ beforeEach(Collections.emptyMap());
+ }
+
+ void beforeEach(Map<String, String> options) throws IOException {
+ this.conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT.value());
+ options.forEach((k, v) -> conf.setString(k, v));
+
+ StreamerUtil.initTableIfNotExists(conf);
+
+ this.table = FlinkTables.createTable(conf);
+ this.metaClient = table.getMetaClient();
+ this.writeClient = FlinkWriteClients.createWriteClient(conf);
+ }
+
+ @Test
+ void rollbackClustering() throws Exception {
+ beforeEach();
+ List<String> oriInstants = IntStream.range(0, 3)
+ .mapToObj(i -> generateClusteringPlan()).collect(Collectors.toList());
+ List<HoodieInstant> instants =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient())
+ .stream().filter(instant -> instant.getState() ==
HoodieInstant.State.INFLIGHT)
+ .collect(Collectors.toList());
+ assertThat("all the instants should be in pending state", instants.size(),
is(3));
+ ClusteringUtil.rollbackClustering(table, writeClient);
+ boolean allRolledBack =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient())
+ .stream().allMatch(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
+ assertTrue(allRolledBack, "all the instants should be rolled back");
+ List<String> actualInstants =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient())
+
.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+ assertThat(actualInstants, is(oriInstants));
+ }
+
+ /**
+ * Generates a clustering plan on the timeline and returns its instant time.
+ */
+ private String generateClusteringPlan() {
+ HoodieClusteringGroup group = new HoodieClusteringGroup();
+ HoodieClusteringPlan plan = new
HoodieClusteringPlan(Collections.singletonList(group),
+ HoodieClusteringStrategy.newBuilder().build(), Collections.emptyMap(),
1, false);
+ HoodieRequestedReplaceMetadata metadata = new
HoodieRequestedReplaceMetadata(WriteOperationType.CLUSTER.name(),
+ plan, Collections.emptyMap(), 1);
+ String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ HoodieInstant clusteringInstant =
+ new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);
+ try {
+
metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
+ TimelineMetadataUtils.serializeRequestedReplaceMetadata(metadata));
+
table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant,
Option.empty());
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Exception scheduling clustering", ioe);
+ }
+ metaClient.reloadActiveTimeline();
+ return instantTime;
+ }
+}