This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5b220703567 [HUDI-6060] Added a config to backup instants before
deletion during rollbacks and restores. (#8430)
5b220703567 is described below
commit 5b22070356799e7470e0999781f9168c4e5ebcc6
Author: Prashant Wason <[email protected]>
AuthorDate: Wed May 31 07:12:39 2023 -0700
[HUDI-6060] Added a config to backup instants before deletion during
rollbacks and restores. (#8430)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 28 ++++++++++++++++
.../rollback/BaseRollbackActionExecutor.java | 39 ++++++++++++++++++++++
.../TestCopyOnWriteRollbackActionExecutor.java | 36 ++++++++++++++++++++
.../table/timeline/HoodieActiveTimeline.java | 14 ++++++++
4 files changed, 117 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index da72151601d..9a7ee2fbaa7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -698,6 +698,16 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "will not print any configuration which contains the configured
filter. For example with "
+ "a configured filter `ssl`, value for config
`ssl.trustore.location` would be masked.");
+ public static final ConfigProperty<Boolean> ROLLBACK_INSTANT_BACKUP_ENABLED
= ConfigProperty
+ .key("hoodie.rollback.instant.backup.enabled")
+ .defaultValue(false)
+ .withDocumentation("Backup instants removed during rollback and
restore (useful for debugging)");
+
+ public static final ConfigProperty<String> ROLLBACK_INSTANT_BACKUP_DIRECTORY
= ConfigProperty
+ .key("hoodie.rollback.instant.backup.dir")
+ .defaultValue(".rollback_backup")
+ .withDocumentation("Path where instants being rolled back are
copied. If not absolute path then a directory relative to .hoodie folder is
created.");
+
private ConsistencyGuardConfig consistencyGuardConfig;
private FileSystemRetryConfig fileSystemRetryConfig;
@@ -2489,6 +2499,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return tableServiceManagerConfig.isTableServiceManagerEnabled();
}
+ public boolean shouldBackupRollbacks() {
+ return getBoolean(ROLLBACK_INSTANT_BACKUP_ENABLED);
+ }
+
+ public String getRollbackBackupDirectory() {
+ return getString(ROLLBACK_INSTANT_BACKUP_DIRECTORY);
+ }
+
public static class Builder {
protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -2961,6 +2979,16 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withRollbackBackupEnabled(boolean rollbackBackupEnabled) {
+ writeConfig.setValue(ROLLBACK_INSTANT_BACKUP_ENABLED,
String.valueOf(rollbackBackupEnabled));
+ return this;
+ }
+
+ public Builder withRollbackBackupDirectory(String backupDir) {
+ writeConfig.setValue(ROLLBACK_INSTANT_BACKUP_DIRECTORY, backupDir);
+ return this;
+ }
+
protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE,
getDefaultMarkersType(engineType));
// Check for mandatory properties
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 86b58b85dbc..3e887503db4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.action.rollback;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
@@ -45,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -211,6 +213,8 @@ public abstract class BaseRollbackActionExecutor<T, I, K,
O> extends BaseActionE
validateRollbackCommitSequence();
}
+ backupRollbackInstantsIfNeeded();
+
try {
List<HoodieRollbackStat> stats = executeRollback(hoodieRollbackPlan);
LOG.info("Rolled back inflight instant " + instantTimeToRollback);
@@ -297,4 +301,39 @@ public abstract class BaseRollbackActionExecutor<T, I, K,
O> extends BaseActionE
BootstrapIndex.getBootstrapIndex(table.getMetaClient()).dropIndex();
}
}
+
+ private void backupRollbackInstantsIfNeeded() {
+ if (!config.shouldBackupRollbacks()) {
+ // Backup not required
+ return;
+ }
+
+ Path backupDir = new Path(config.getRollbackBackupDirectory());
+ if (!backupDir.isAbsolute()) {
+ // Path specified is relative to the meta directory
+ backupDir = new Path(table.getMetaClient().getMetaPath(),
config.getRollbackBackupDirectory());
+ }
+
+ // Determine the instants to back up
+ HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+ List<HoodieInstant> instantsToBackup = new ArrayList<>(3);
+ instantsToBackup.add(instantToRollback);
+ if (instantToRollback.isCompleted()) {
+
instantsToBackup.add(HoodieTimeline.getInflightInstant(instantToRollback,
table.getMetaClient()));
+
instantsToBackup.add(HoodieTimeline.getRequestedInstant(instantToRollback));
+ }
+ if (instantToRollback.isInflight()) {
+
instantsToBackup.add(HoodieTimeline.getRequestedInstant(instantToRollback));
+ }
+
+ for (HoodieInstant instant : instantsToBackup) {
+ try {
+ activeTimeline.copyInstant(instant, backupDir);
+ LOG.info(String.format("Copied instant %s to backup dir %s during
rollback at %s", instant, backupDir, instantTime));
+ } catch (HoodieIOException e) {
+ // Ignoring error in backing up
+ LOG.warn("Failed to backup rollback instant: " + e.getMessage());
+ }
+ }
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
index b265f860be3..b0eae3b83ec 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.Assertions;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -328,4 +329,39 @@ public class TestCopyOnWriteRollbackActionExecutor extends
HoodieClientRollbackT
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table,
commitInstant.getTimestamp()).doesMarkerDirExist());
}
+
+ @Test
+ public void testRollbackBackup() throws Exception {
+ final String p1 = "2015/03/16";
+ final String p2 = "2015/03/17";
+ final String p3 = "2016/03/15";
+ // Let's create some commit files and parquet files
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient)
+ .withPartitionMetaFiles(p1, p2, p3)
+ .addCommit("001")
+ .withBaseFilesInPartition(p1, "id11")
+ .withBaseFilesInPartition(p2, "id12")
+ .withLogFile(p1, "id11", 3)
+ .addCommit("002")
+ .withBaseFilesInPartition(p1, "id21")
+ .withBaseFilesInPartition(p2, "id22");
+
+ HoodieTable table = this.getHoodieTable(metaClient,
getConfigBuilder().withRollbackBackupEnabled(true).build());
+ HoodieInstant needRollBackInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, "002");
+
+ // Create the rollback plan and perform the rollback
+ BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
+ new BaseRollbackPlanActionExecutor(context, table.getConfig(), table,
"003", needRollBackInstant, false,
+ table.getConfig().shouldRollbackUsingMarkers());
+ copyOnWriteRollbackPlanActionExecutor.execute();
+
+ CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new
CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003",
+ needRollBackInstant, true, false);
+ copyOnWriteRollbackActionExecutor.execute();
+
+ // Completed and inflight instants should have been backed up
+ Path backupDir = new Path(metaClient.getMetaPath(),
table.getConfig().getRollbackBackupDirectory());
+ assertTrue(fs.exists(new Path(backupDir,
testTable.getCommitFilePath("002").getName())));
+ assertTrue(fs.exists(new Path(backupDir,
testTable.getInflightCommitFilePath("002").getName())));
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index fb33da5ec45..e8c94e474fa 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -31,6 +31,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -807,4 +808,17 @@ public class HoodieActiveTimeline extends
HoodieDefaultTimeline {
public HoodieActiveTimeline reload() {
return new HoodieActiveTimeline(metaClient);
}
+
+ public void copyInstant(HoodieInstant instant, Path dstDir) {
+ Path srcPath = new Path(metaClient.getMetaPath(), instant.getFileName());
+ Path dstPath = new Path(dstDir, instant.getFileName());
+ try {
+ FileSystem srcFs = srcPath.getFileSystem(metaClient.getHadoopConf());
+ FileSystem dstFs = dstPath.getFileSystem(metaClient.getHadoopConf());
+ dstFs.mkdirs(dstDir);
+ FileUtil.copy(srcFs, srcPath, dstFs, dstPath, false, true,
srcFs.getConf());
+ } catch (IOException e) {
+ throw new HoodieIOException("Could not copy instant from " + srcPath + "
to " + dstPath, e);
+ }
+ }
}