This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2954027 [HUDI-52] Enabling savepoint and restore for MOR table (#4507)
2954027 is described below
commit 2954027b92ada82c41d0a72cc0b837564a730a89
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Jan 6 10:56:08 2022 -0500
[HUDI-52] Enabling savepoint and restore for MOR table (#4507)
* Enabling restore for MOR table
* Fixing savepoint for compaction commits in MOR
---
.../hudi/cli/commands/SavepointsCommand.java | 12 ++-
.../action/savepoint/SavepointActionExecutor.java | 9 +--
.../TestHoodieSparkMergeOnReadTableRollback.java | 94 ++++++++++++++++++++++
3 files changed, 101 insertions(+), 14 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
index 0ea2fff..d3f8584 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
@@ -78,11 +78,9 @@ public class SavepointsCommand implements CommandMarker {
throws Exception {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
- HoodieTimeline timeline =
activeTimeline.getCommitTimeline().filterCompletedInstants();
- HoodieInstant commitInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, commitTime);
- if (!timeline.containsInstant(commitInstant)) {
- return "Commit " + commitTime + " not found in Commits " + timeline;
+ if
(!activeTimeline.getCommitsTimeline().filterCompletedInstants().containsInstant(commitTime))
{
+ return "Commit " + commitTime + " not found in Commits " +
activeTimeline;
}
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
@@ -112,10 +110,10 @@ public class SavepointsCommand implements CommandMarker {
throw new HoodieException("There are no completed instants to run
rollback");
}
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
- HoodieTimeline timeline =
activeTimeline.getCommitTimeline().filterCompletedInstants();
- HoodieInstant commitInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, instantTime);
+ HoodieTimeline timeline =
activeTimeline.getCommitsTimeline().filterCompletedInstants();
+ List<HoodieInstant> instants = timeline.getInstants().filter(instant ->
instant.getTimestamp().equals(instantTime)).collect(Collectors.toList());
- if (!timeline.containsInstant(commitInstant)) {
+ if (instants.isEmpty()) {
return "Commit " + instantTime + " not found in Commits " + timeline;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
index de1d973..134b238 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/savepoint/SavepointActionExecutor.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
@@ -65,13 +64,9 @@ public class SavepointActionExecutor<T extends
HoodieRecordPayload, I, K, O> ext
@Override
public HoodieSavepointMetadata execute() {
- if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ)
{
- throw new UnsupportedOperationException("Savepointing is not supported
or MergeOnRead table types");
- }
Option<HoodieInstant> cleanInstant =
table.getCompletedCleanTimeline().lastInstant();
- HoodieInstant commitInstant = new HoodieInstant(false,
HoodieTimeline.COMMIT_ACTION, instantTime);
- if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) {
- throw new HoodieSavepointException("Could not savepoint non-existing
commit " + commitInstant);
+ if (!table.getCompletedCommitsTimeline().containsInstant(instantTime)) {
+ throw new HoodieSavepointException("Could not savepoint non-existing
commit " + instantTime);
}
try {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 5eee262..38becc9 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -464,6 +464,100 @@ public class TestHoodieSparkMergeOnReadTableRollback
extends SparkClientFunction
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
+ HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false)
+ // Timeline-server-based markers are not used for multi-rollback tests
+ .withMarkersType(MarkerType.DIRECT.name())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
+ HoodieWriteConfig cfg = cfgBuilder.build();
+
+ Properties properties = new Properties();
+ properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(),
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
+ HoodieTableMetaClient metaClient =
getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
+
+ try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ List<HoodieRecord> records = insertAndGetRecords("001", client, dataGen,
200);
+ List<HoodieRecord> updates1 = updateAndGetRecords("002", client,
dataGen, records);
+ List<HoodieRecord> updates2 = updateAndGetRecords("003", client,
dataGen, records);
+ List<HoodieRecord> updates3 = updateAndGetRecords("004", client,
dataGen, records);
+ validateRecords(cfg, metaClient, updates3);
+
+ if (!restoreAfterCompaction) {
+ // restore to 002 and validate records.
+ client.restoreToInstant("002");
+ validateRecords(cfg, metaClient, updates1);
+ } else {
+ // trigger compaction and then trigger couple of upserts followed by
restore.
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ String compactionInstantTime = "005";
+ client.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty());
+ JavaRDD<WriteStatus> ws = (JavaRDD<WriteStatus>)
client.compact(compactionInstantTime);
+ client.commitCompaction(compactionInstantTime, ws, Option.empty());
+
+ validateRecords(cfg, metaClient, updates3);
+ List<HoodieRecord> updates4 = updateAndGetRecords("006", client,
dataGen, records);
+ List<HoodieRecord> updates5 = updateAndGetRecords("007", client,
dataGen, records);
+ validateRecords(cfg, metaClient, updates5);
+
+ // restore to 003 and validate records.
+ client.restoreToInstant("003");
+ validateRecords(cfg, metaClient, updates2);
+ }
+ }
+ }
+
+ private List<HoodieRecord> insertAndGetRecords(String newCommitTime,
SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, int count) {
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, count);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+ JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords,
newCommitTime);
+ client.commit(newCommitTime, writeStatusJavaRDD);
+ List<WriteStatus> statuses = writeStatusJavaRDD.collect();
+ assertNoWriteErrors(statuses);
+ return records;
+ }
+
+ private List<HoodieRecord> updateAndGetRecords(String newCommitTime,
SparkRDDWriteClient client, HoodieTestDataGenerator dataGen, List<HoodieRecord>
records) throws IOException {
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> updates = dataGen.generateUpdates(newCommitTime,
records);
+ JavaRDD<WriteStatus> writeStatusJavaRDD =
client.upsert(jsc().parallelize(updates, 1), newCommitTime);
+ client.commit(newCommitTime, writeStatusJavaRDD);
+ return updates;
+ }
+
+ private void validateRecords(HoodieWriteConfig cfg, HoodieTableMetaClient
metaClient, List<HoodieRecord> expectedRecords) throws IOException {
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(),
metaClient);
+ FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
+ HoodieTableFileSystemView tableView =
getHoodieTableFileSystemView(metaClient,
hoodieTable.getCompletedCommitsTimeline(), allFiles);
+ List<String> dataFiles = tableView.getLatestBaseFiles().map(hf ->
hf.getPath()).collect(Collectors.toList());
+ List<GenericRecord> recordsRead =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(hadoopConf(), dataFiles,
+ basePath());
+ assertRecords(expectedRecords, recordsRead);
+ }
+
+ private void assertRecords(List<HoodieRecord> inputRecords,
List<GenericRecord> recordsRead) {
+ assertEquals(recordsRead.size(), inputRecords.size());
+ Map<String, GenericRecord> expectedRecords = new HashMap<>();
+ inputRecords.forEach(entry -> {
+ try {
+ expectedRecords.put(entry.getRecordKey(), ((GenericRecord)
entry.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA).get()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ });
+
+ Map<String, GenericRecord> actualRecords = new HashMap<>();
+ recordsRead.forEach(entry ->
actualRecords.put(String.valueOf(entry.get("_row_key")), entry));
+ for (Map.Entry<String, GenericRecord> entry : expectedRecords.entrySet()) {
+ assertEquals(String.valueOf(entry.getValue().get("driver")),
String.valueOf(actualRecords.get(entry.getKey()).get("driver")));
+ }
+ }
+
private HoodieWriteConfig
getHoodieWriteConfigWithSmallFileHandlingOff(boolean populateMetaFields) {
return
getHoodieWriteConfigWithSmallFileHandlingOffBuilder(populateMetaFields).build();
}