This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2dbb273 [HUDI-3721] Delete MDT if necessary when trigger rollback to
savepoint (#5173)
2dbb273 is described below
commit 2dbb273d26ea38ecf28af704876861554250449c
Author: YueZhang <[email protected]>
AuthorDate: Thu Mar 31 11:26:37 2022 +0800
[HUDI-3721] Delete MDT if necessary when trigger rollback to savepoint
(#5173)
Co-authored-by: yuezhang <[email protected]>
---
.../hudi/cli/integ/ITTestSavepointsCommand.java | 53 ++++++++++++++++++++++
.../apache/hudi/client/BaseHoodieWriteClient.java | 45 ++++++++++++++----
.../apache/hudi/client/HoodieFlinkWriteClient.java | 2 +-
.../apache/hudi/client/HoodieJavaWriteClient.java | 2 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 12 +++--
.../hudi/client/TestTableSchemaEvolution.java | 4 +-
.../functional/TestHoodieBackedMetadata.java | 2 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 2 +-
.../TestHoodieSparkMergeOnReadTableRollback.java | 8 ++--
9 files changed, 106 insertions(+), 24 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
index 5f8021a..7de1c2d 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.commands.TableCommand;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -29,6 +31,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.shell.core.CommandResult;
@@ -119,6 +124,54 @@ public class ITTestSavepointsCommand extends
AbstractShellIntegrationTest {
}
/**
+ * Test case of command 'savepoint rollback' with metadata table bootstrap.
+ */
+ @Test
+ public void testRollbackToSavepointWithMetadataTableEnable() throws
IOException {
+ // generate for savepoints
+ for (int i = 101; i < 105; i++) {
+ String instantTime = String.valueOf(i);
+ HoodieTestDataGenerator.createCommitFile(tablePath, instantTime,
jsc.hadoopConfiguration());
+ }
+
+ // generate one savepoint at 102
+ String savepoint = "102";
+ HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint,
jsc.hadoopConfiguration());
+
+ // re-bootstrap metadata table
+ // delete first
+ String basePath = metaClient.getBasePath();
+ Path metadataTableBasePath = new
Path(HoodieTableMetadata.getMetadataTableBasePath(basePath));
+ metaClient.getFs().delete(metadataTableBasePath, true);
+
+ // then bootstrap metadata table at instant 104
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath(HoodieCLI.basePath)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
+ SparkHoodieBackedTableMetadataWriter.create(HoodieCLI.conf, writeConfig,
new HoodieSparkEngineContext(jsc));
+
+ assertTrue(HoodieCLI.fs.exists(metadataTableBasePath));
+
+ // roll back to savepoint
+ CommandResult cr = getShell().executeCommand(
+ String.format("savepoint rollback --savepoint %s --sparkMaster %s",
savepoint, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals(
+ String.format("Savepoint \"%s\" rolled back", savepoint),
cr.getResult().toString()));
+
+ // there is 1 restore instant
+ HoodieActiveTimeline timeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
+ assertEquals(1, timeline.getRestoreTimeline().countInstants());
+
+ // 103 and 104 instant had rollback
+ assertFalse(timeline.getCommitTimeline().containsInstant(
+ new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
+ assertFalse(timeline.getCommitTimeline().containsInstant(
+ new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "104")));
+ }
+
+ /**
* Test case of command 'savepoint delete'.
*/
@Test
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 028fdac..0e372cb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hadoop.fs.Path;
import org.apache.hudi.async.AsyncArchiveService;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -66,6 +67,7 @@ import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
@@ -643,9 +645,30 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
* @return true if the savepoint was restored to successfully
*/
public void restoreToSavepoint(String savepointTime) {
- HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty());
+ boolean initialMetadataTableIfNecessary = config.isMetadataTableEnabled();
+ if (initialMetadataTableIfNecessary) {
+ try {
+ // Delete metadata table directly when users trigger savepoint
rollback if mdt existed and beforeTimelineStarts
+ String metadataTableBasePathStr =
HoodieTableMetadata.getMetadataTableBasePath(config.getBasePath());
+ HoodieTableMetaClient mdtClient =
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePathStr).build();
+ // Same as HoodieTableMetadataUtil#processRollbackMetadata
+ HoodieInstant syncedInstant = new HoodieInstant(false,
HoodieTimeline.DELTA_COMMIT_ACTION, savepointTime);
+ // The instant required to sync rollback to MDT has been archived and
the mdt syncing will be failed
+ // So that we need to delete the whole MDT here.
+ if
(mdtClient.getCommitsTimeline().isBeforeTimelineStarts(syncedInstant.getTimestamp()))
{
+ mdtClient.getFs().delete(new Path(metadataTableBasePathStr), true);
+ // rollbackToSavepoint action will try to bootstrap MDT at first but
sync to MDT will fail at the current scenario.
+ // so that we need to disable metadata initialized here.
+ initialMetadataTableIfNecessary = false;
+ }
+ } catch (Exception e) {
+ // Metadata directory does not exist
+ }
+ }
+
+ HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty(), initialMetadataTableIfNecessary);
SavepointHelpers.validateSavepointPresence(table, savepointTime);
- restoreToInstant(savepointTime);
+ restoreToInstant(savepointTime, initialMetadataTableIfNecessary);
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}
@@ -659,7 +682,7 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
- * will be removed in future in favor of {@link
BaseHoodieWriteClient#restoreToInstant(String)}
+ * will be removed in future in favor of {@link
BaseHoodieWriteClient#restoreToInstant(String, boolean)
*
* @param commitInstantTime Instant time of the commit
* @param pendingRollbackInfo pending rollback instant and plan if rollback
failed from previous attempt.
@@ -717,12 +740,12 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
*
* @param instantTime Instant time to which restoration is requested
*/
- public HoodieRestoreMetadata restoreToInstant(final String instantTime)
throws HoodieRestoreException {
+ public HoodieRestoreMetadata restoreToInstant(final String instantTime,
boolean initialMetadataTableIfNecessary) throws HoodieRestoreException {
LOG.info("Begin restore to instant " + instantTime);
final String restoreInstantTime =
HoodieActiveTimeline.createNewInstantTime();
Timer.Context timerContext = metrics.getRollbackCtx();
try {
- HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty());
+ HoodieTable<T, I, K, O> table = initTable(WriteOperationType.UNKNOWN,
Option.empty(), initialMetadataTableIfNecessary);
Option<HoodieRestorePlan> restorePlanOption =
table.scheduleRestore(context, restoreInstantTime, instantTime);
if (restorePlanOption.isPresent()) {
HoodieRestoreMetadata restoreMetadata = table.restore(context,
restoreInstantTime, instantTime);
@@ -1288,14 +1311,14 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
* @param instantTime current inflight instant time
* @return instantiated {@link HoodieTable}
*/
- protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime);
+ protected abstract HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime, boolean initialMetadataTableIfNecessary);
/**
* Instantiates and initializes instance of {@link HoodieTable}, performing
crucial bootstrapping
* operations such as:
*
* NOTE: This method is engine-agnostic and SHOULD NOT be overloaded, please
check on
- * {@link #doInitTable(HoodieTableMetaClient, Option<String>)} instead
+ * {@link #doInitTable(HoodieTableMetaClient, Option, boolean)} instead
*
* <ul>
* <li>Checking whether upgrade/downgrade is required</li>
@@ -1303,7 +1326,7 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
* <li>Initializing metrics contexts</li>
* </ul>
*/
- protected final HoodieTable initTable(WriteOperationType operationType,
Option<String> instantTime) {
+ protected final HoodieTable initTable(WriteOperationType operationType,
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
HoodieTableMetaClient metaClient = createMetaClient(true);
// Setup write schemas for deletes
if (operationType == WriteOperationType.DELETE) {
@@ -1315,7 +1338,7 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
this.txnManager.beginTransaction();
try {
tryUpgrade(metaClient, instantTime);
- table = doInitTable(metaClient, instantTime);
+ table = doInitTable(metaClient, instantTime,
initialMetadataTableIfNecessary);
} finally {
this.txnManager.endTransaction();
}
@@ -1348,6 +1371,10 @@ public abstract class BaseHoodieWriteClient<T extends
HoodieRecordPayload, I, K,
return table;
}
+ protected final HoodieTable initTable(WriteOperationType operationType,
Option<String> instantTime) {
+ return initTable(operationType, instantTime,
config.isMetadataTableEnabled());
+ }
+
/**
* Sets write schema from last instant since deletes may not have schema
set in the config.
*/
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 4523705..626727a 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -398,7 +398,7 @@ public class HoodieFlinkWriteClient<T extends
HoodieRecordPayload> extends
}
@Override
- protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime) {
+ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// Create a Hoodie table which encapsulated the commits and files visible
return getHoodieTable();
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index faf46e0..a506131 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -233,7 +233,7 @@ public class HoodieJavaWriteClient<T extends
HoodieRecordPayload> extends
}
@Override
- protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime) {
+ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
// new JavaUpgradeDowngrade(metaClient, config, context).run(metaClient,
HoodieTableVersion.current(), config, context, instantTime);
// Create a Hoodie table which encapsulated the commits and files visible
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index f6d0632..b3e3c25 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -425,11 +425,13 @@ public class SparkRDDWriteClient<T extends
HoodieRecordPayload> extends
}
@Override
- protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime) {
- // Initialize Metadata Table to make sure it's bootstrapped _before_ the
operation,
- // if it didn't exist before
- // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
- initializeMetadataTable(instantTime);
+ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient,
Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
+ if (initialMetadataTableIfNecessary) {
+ // Initialize Metadata Table to make sure it's bootstrapped _before_ the
operation,
+ // if it didn't exist before
+ // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
+ initializeMetadataTable(instantTime);
+ }
// Create a Hoodie table which encapsulated the commits and files visible
return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context,
metaClient, config.isMetadataTableEnabled());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 3fb4549..1cb7bcb 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -291,7 +291,7 @@ public class TestTableSchemaEvolution extends
HoodieClientTestBase {
}
// Rollback to the original schema
- client.restoreToInstant("004");
+ client.restoreToInstant("004", hoodieWriteConfig.isMetadataTableEnabled());
checkLatestDeltaCommit("004");
// Updates with original schema are now allowed
@@ -432,7 +432,7 @@ public class TestTableSchemaEvolution extends
HoodieClientTestBase {
// Revert to the older commit and ensure that the original schema can now
// be used for inserts and inserts.
- client.restoreToInstant("003");
+ client.restoreToInstant("003", hoodieWriteConfig.isMetadataTableEnabled());
curTimeline =
metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants();
assertTrue(curTimeline.lastInstant().get().getTimestamp().equals("003"));
checkReadRecords("000", numRecords);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index d7d0d26..3497a68 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1208,7 +1208,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
validateMetadata(client);
// Restore
- client.restoreToInstant("0000006");
+ client.restoreToInstant("0000006", writeConfig.isMetadataTableEnabled());
validateMetadata(client);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index ce0cc37..3b78954 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -585,7 +585,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
client.savepoint("004", "user1","comment1");
- client.restoreToInstant("004");
+ client.restoreToInstant("004", config.isMetadataTableEnabled());
assertFalse(metaClient.reloadActiveTimeline().getRollbackTimeline().lastInstant().isPresent());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index bce2ec8..339e9e1 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -150,7 +150,7 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
// NOTE: First writer will have Metadata table DISABLED
HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder(false, rollbackUsingMarkers,
HoodieIndex.IndexType.SIMPLE);
-
+
addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build();
@@ -480,7 +480,7 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
copyOfRecords.clear();
// Rollback latest commit first
- client.restoreToInstant("000");
+ client.restoreToInstant("000", cfg.isMetadataTableEnabled());
metaClient = HoodieTableMetaClient.reload(metaClient);
allFiles = listAllBaseFilesInPath(hoodieTable);
@@ -530,7 +530,7 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
if (!restoreAfterCompaction) {
// restore to 002 and validate records.
- client.restoreToInstant("002");
+ client.restoreToInstant("002", cfg.isMetadataTableEnabled());
validateRecords(cfg, metaClient, updates1);
} else {
// trigger compaction and then trigger couple of upserts followed by
restore.
@@ -546,7 +546,7 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
validateRecords(cfg, metaClient, updates5);
// restore to 003 and validate records.
- client.restoreToInstant("003");
+ client.restoreToInstant("003", cfg.isMetadataTableEnabled());
validateRecords(cfg, metaClient, updates2);
}
}