This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9d744bb35c [HUDI-3805] Delete existing corrupted requested rollback
plan during rollback (#5245)
9d744bb35c is described below
commit 9d744bb35ce54d347c5ef50adeba3f2a8840d043
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Apr 7 03:02:34 2022 -0700
[HUDI-3805] Delete existing corrupted requested rollback plan during
rollback (#5245)
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 41 +++++---
.../org/apache/hudi/client/TestClientRollback.java | 106 +++++++++++++++++++++
.../hudi/common/testutils/FileCreateUtils.java | 4 +
3 files changed, 140 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 70e3cebce4..32a8dee517 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,7 +18,6 @@
package org.apache.hudi.client;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.HoodieAvroUtils;
@@ -72,7 +71,6 @@ import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
@@ -82,6 +80,7 @@ import
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metrics.HoodieMetrics;
@@ -95,8 +94,9 @@ import org.apache.hudi.table.upgrade.SupportsUpgradeDowngrade;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import com.codahale.metrics.Timer;
-import org.apache.hadoop.conf.Configuration;
import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -105,11 +105,11 @@ import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
-import java.util.Set;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -1113,9 +1113,28 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
protected Map<String, Option<HoodiePendingRollbackInfo>>
getPendingRollbackInfos(HoodieTableMetaClient metaClient, boolean
ignoreCompactionAndClusteringInstants) {
List<HoodieInstant> instants =
metaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants().collect(Collectors.toList());
Map<String, Option<HoodiePendingRollbackInfo>> infoMap = new HashMap<>();
- for (HoodieInstant instant : instants) {
+ for (HoodieInstant rollbackInstant : instants) {
+ HoodieRollbackPlan rollbackPlan;
+ try {
+ rollbackPlan = RollbackUtils.getRollbackPlan(metaClient,
rollbackInstant);
+ } catch (IOException e) {
+ if (rollbackInstant.isRequested()) {
+ LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ",
deleting the plan since it's in REQUESTED state", e);
+ try {
+ metaClient.getActiveTimeline().deletePending(rollbackInstant);
+ } catch (HoodieIOException he) {
+ LOG.warn("Cannot delete " + rollbackInstant, he);
+ continue;
+ }
+ } else {
+ // Here we assume that if the rollback is inflight, the rollback
plan is intact
+ // in instant.rollback.requested. The exception here can be due to
other reasons.
+ LOG.warn("Fetching rollback plan failed for " + rollbackInstant + ",
skip the plan", e);
+ }
+ continue;
+ }
+
try {
- HoodieRollbackPlan rollbackPlan =
RollbackUtils.getRollbackPlan(metaClient, instant);
String action = rollbackPlan.getInstantToRollback().getAction();
if (ignoreCompactionAndClusteringInstants) {
if (!HoodieTimeline.COMPACTION_ACTION.equals(action)) {
@@ -1124,14 +1143,14 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
rollbackPlan.getInstantToRollback().getCommitTime())).isPresent();
if (!isClustering) {
String instantToRollback =
rollbackPlan.getInstantToRollback().getCommitTime();
- infoMap.putIfAbsent(instantToRollback, Option.of(new
HoodiePendingRollbackInfo(instant, rollbackPlan)));
+ infoMap.putIfAbsent(instantToRollback, Option.of(new
HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
}
}
} else {
-
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(),
Option.of(new HoodiePendingRollbackInfo(instant, rollbackPlan)));
+
infoMap.putIfAbsent(rollbackPlan.getInstantToRollback().getCommitTime(),
Option.of(new HoodiePendingRollbackInfo(rollbackInstant, rollbackPlan)));
}
- } catch (IOException e) {
- LOG.warn("Fetching rollback plan failed for " + infoMap + ", skip the
plan", e);
+ } catch (Exception e) {
+ LOG.warn("Processing rollback plan failed for " + rollbackInstant + ",
skip the plan", e);
}
}
return infoMap;
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
index d2dabc0792..f6315eec7d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestClientRollback.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieRollbackRequest;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -61,9 +62,11 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -481,4 +484,107 @@ public class TestClientRollback extends
HoodieClientTestBase {
assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
}
}
+
+ private static Stream<Arguments>
testRollbackWithRequestedRollbackPlanParams() {
+ return Arrays.stream(new Boolean[][] {
+ {true, true}, {true, false}, {false, true}, {false, false},
+ }).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testRollbackWithRequestedRollbackPlanParams")
+ public void testRollbackWithRequestedRollbackPlan(boolean
enableMetadataTable, boolean isRollbackPlanCorrupted) throws Exception {
+ // Let's create some commit files and base files
+ final String p1 = "2022/04/05";
+ final String p2 = "2022/04/06";
+ final String commitTime1 = "20220406010101002";
+ final String commitTime2 = "20220406020601002";
+ final String commitTime3 = "20220406030611002";
+ final String rollbackInstantTime = "20220406040611002";
+ Map<String, String> partitionAndFileId1 = new HashMap<String, String>() {
+ {
+ put(p1, "id11");
+ put(p2, "id12");
+ }
+ };
+ Map<String, String> partitionAndFileId2 = new HashMap<String, String>() {
+ {
+ put(p1, "id21");
+ put(p2, "id22");
+ }
+ };
+ Map<String, String> partitionAndFileId3 = new HashMap<String, String>() {
+ {
+ put(p1, "id31");
+ put(p2, "id32");
+ }
+ };
+
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
+ .withRollbackUsingMarkers(false)
+ .withMetadataConfig(
+ HoodieMetadataConfig.newBuilder()
+ // Column Stats Index is disabled, since these tests construct
tables which are
+ // not valid (empty commit metadata, invalid parquet files)
+ .withMetadataIndexColumnStats(false)
+ .enable(enableMetadataTable)
+ .build()
+ )
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build()).build();
+
+ HoodieTestTable testTable = enableMetadataTable
+ ? HoodieMetadataTestTable.of(metaClient,
SparkHoodieBackedTableMetadataWriter.create(
+ metaClient.getHadoopConf(), config, context))
+ : HoodieTestTable.of(metaClient);
+
+ testTable.withPartitionMetaFiles(p1, p2)
+ .addCommit(commitTime1)
+ .withBaseFilesInPartitions(partitionAndFileId1)
+ .addCommit(commitTime2)
+ .withBaseFilesInPartitions(partitionAndFileId2)
+ .addInflightCommit(commitTime3)
+ .withBaseFilesInPartitions(partitionAndFileId3);
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
+ if (isRollbackPlanCorrupted) {
+ // Add a corrupted requested rollback plan
+ FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(),
rollbackInstantTime, new byte[] {0, 1, 2});
+ } else {
+ // Add a valid requested rollback plan to roll back commitTime3
+ HoodieRollbackPlan rollbackPlan = new HoodieRollbackPlan();
+ List<HoodieRollbackRequest> rollbackRequestList =
partitionAndFileId3.keySet().stream()
+ .map(partition -> new HoodieRollbackRequest(partition,
EMPTY_STRING, EMPTY_STRING,
+ Collections.singletonList(metaClient.getBasePath() + "/" +
partition + "/"
+ + FileCreateUtils.baseFileName(commitTime3,
partitionAndFileId3.get(p1))),
+ Collections.emptyMap()))
+ .collect(Collectors.toList());
+ rollbackPlan.setRollbackRequests(rollbackRequestList);
+ rollbackPlan.setInstantToRollback(new HoodieInstantInfo(commitTime3,
HoodieTimeline.COMMIT_ACTION));
+ FileCreateUtils.createRequestedRollbackFile(metaClient.getBasePath(),
rollbackInstantTime, rollbackPlan);
+ }
+
+ // Rollback commit3
+ client.rollback(commitTime3);
+ assertFalse(testTable.inflightCommitExists(commitTime3));
+ assertFalse(testTable.baseFilesExist(partitionAndFileId3, commitTime3));
+ assertTrue(testTable.baseFilesExist(partitionAndFileId2, commitTime2));
+
+ metaClient.reloadActiveTimeline();
+ List<HoodieInstant> rollbackInstants =
metaClient.getActiveTimeline().getRollbackTimeline().getInstants().collect(Collectors.toList());
+ // Corrupted requested rollback plan should be deleted before scheduling
a new one
+ assertEquals(rollbackInstants.size(), 1);
+ HoodieInstant rollbackInstant = rollbackInstants.get(0);
+ assertTrue(rollbackInstant.isCompleted());
+
+ if (isRollbackPlanCorrupted) {
+ // Should create a new rollback instant
+ assertNotEquals(rollbackInstantTime, rollbackInstant.getTimestamp());
+ } else {
+ // Should reuse the rollback instant
+ assertEquals(rollbackInstantTime, rollbackInstant.getTimestamp());
+ }
+ }
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
index 06f0ac49b6..27dd9df5ed 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
@@ -244,6 +244,10 @@ public class FileCreateUtils {
createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, serializeRollbackPlan(plan).get());
}
+ public static void createRequestedRollbackFile(String basePath, String
instantTime, byte[] content) throws IOException {
+ createMetaFile(basePath, instantTime,
HoodieTimeline.REQUESTED_ROLLBACK_EXTENSION, content);
+ }
+
public static void createInflightRollbackFile(String basePath, String
instantTime) throws IOException {
createMetaFile(basePath, instantTime,
HoodieTimeline.INFLIGHT_ROLLBACK_EXTENSION);
}