This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 161a798 [HUDI-706] Add unit test for SavepointsCommand (#1624)
161a798 is described below
commit 161a7983375b0787398a1219a80531019810e692
Author: hongdd <[email protected]>
AuthorDate: Tue May 19 18:36:01 2020 +0800
[HUDI-706] Add unit test for SavepointsCommand (#1624)
---
.../apache/hudi/cli/HoodieTableHeaderFields.java | 5 +
.../hudi/cli/commands/SavepointsCommand.java | 76 ++++++----
.../org/apache/hudi/cli/commands/SparkMain.java | 48 ++++++-
.../hudi/cli/commands/TestSavepointsCommand.java | 110 +++++++++++++++
.../hudi/cli/integ/ITTestSavepointsCommand.java | 157 +++++++++++++++++++++
5 files changed, 361 insertions(+), 35 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index 4fc41a1..77f486b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -84,6 +84,11 @@ public class HoodieTableHeaderFields {
public static final String HEADER_NEW_VALUE = "New Value";
/**
+ * Fields of Savepoints.
+ */
+ public static final String HEADER_SAVEPOINT_TIME = "SavepointTime";
+
+ /**
* Fields of Rollback.
*/
public static final String HEADER_ROLLBACK_INSTANT = "Rolledback " +
HEADER_INSTANT;
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
index b5bc349..314ed00 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SavepointsCommand.java
@@ -20,6 +20,7 @@ package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.HoodieWriteClient;
@@ -30,7 +31,6 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.spark.api.java.JavaSparkContext;
@@ -40,7 +40,6 @@ import org.springframework.shell.core.annotation.CliCommand;
import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
-import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@@ -51,7 +50,7 @@ import java.util.stream.Collectors;
public class SavepointsCommand implements CommandMarker {
@CliCommand(value = "savepoints show", help = "Show the savepoints")
- public String showSavepoints() throws IOException {
+ public String showSavepoints() {
HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
HoodieTimeline timeline =
activeTimeline.getSavePointTimeline().filterCompletedInstants();
List<HoodieInstant> commits =
timeline.getReverseOrderedInstants().collect(Collectors.toList());
@@ -60,13 +59,19 @@ public class SavepointsCommand implements CommandMarker {
HoodieInstant commit = commits.get(i);
rows[i] = new String[] {commit.getTimestamp()};
}
- return HoodiePrintHelper.print(new String[] {"SavepointTime"}, rows);
+ return HoodiePrintHelper.print(new String[]
{HoodieTableHeaderFields.HEADER_SAVEPOINT_TIME}, rows);
}
@CliCommand(value = "savepoint create", help = "Savepoint a commit")
public String savepoint(@CliOption(key = {"commit"}, help = "Commit to
savepoint") final String commitTime,
- @CliOption(key = {"user"}, unspecifiedDefaultValue = "default", help =
"User who is creating the savepoint") final String user,
- @CliOption(key = {"comments"}, unspecifiedDefaultValue = "default", help
= "Comments for creating the savepoint") final String comments)
+ @CliOption(key = {"user"}, unspecifiedDefaultValue = "default",
+ help = "User who is creating the savepoint") final String user,
+ @CliOption(key = {"comments"}, unspecifiedDefaultValue = "default",
+ help = "Comments for creating the savepoint") final String comments,
+ @CliOption(key = {"sparkProperties"}, help = "Spark Properties File
Path") final String sparkPropertiesPath,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help =
"Spark Master") String master,
+ @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
+ help = "Spark executor memory") final String sparkMemory)
throws Exception {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
@@ -77,25 +82,27 @@ public class SavepointsCommand implements CommandMarker {
return "Commit " + commitTime + " not found in Commits " + timeline;
}
- String result;
- try (JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Create
Savepoint")) {
- HoodieWriteClient client = createHoodieClient(jsc,
metaClient.getBasePath());
- try {
- client.savepoint(commitTime, user, comments);
- // Refresh the current
- refreshMetaClient();
- result = String.format("The commit \"%s\" has been savepointed.",
commitTime);
- } catch (HoodieSavepointException se) {
- result = String.format("Failed: Could not create savepoint \"%s\".",
commitTime);
- }
+ SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
+ sparkLauncher.addAppArgs(SparkMain.SparkCommand.SAVEPOINT.toString(),
master, sparkMemory, commitTime,
+ user, comments, metaClient.getBasePath());
+ Process process = sparkLauncher.launch();
+ InputStreamConsumer.captureOutput(process);
+ int exitCode = process.waitFor();
+ // Refresh the current
+ refreshMetaClient();
+ if (exitCode != 0) {
+ return String.format("Failed: Could not create savepoint \"%s\".",
commitTime);
}
- return result;
+ return String.format("The commit \"%s\" has been savepointed.",
commitTime);
}
@CliCommand(value = "savepoint rollback", help = "Savepoint a commit")
public String rollbackToSavepoint(
@CliOption(key = {"savepoint"}, help = "Savepoint to rollback") final
String instantTime,
- @CliOption(key = {"sparkProperties"}, help = "Spark Properties File
Path") final String sparkPropertiesPath)
+ @CliOption(key = {"sparkProperties"}, help = "Spark Properties File
Path") final String sparkPropertiesPath,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help =
"Spark Master") String master,
+ @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
+ help = "Spark executor memory") final String sparkMemory)
throws Exception {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
if
(metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants().empty())
{
@@ -110,17 +117,17 @@ public class SavepointsCommand implements CommandMarker {
}
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(),
instantTime,
- metaClient.getBasePath());
+
sparkLauncher.addAppArgs(SparkMain.SparkCommand.ROLLBACK_TO_SAVEPOINT.toString(),
master, sparkMemory,
+ instantTime, metaClient.getBasePath());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
int exitCode = process.waitFor();
// Refresh the current
refreshMetaClient();
if (exitCode != 0) {
- return "Savepoint " + instantTime + " failed to roll back";
+ return String.format("Savepoint \"%s\" failed to roll back",
instantTime);
}
- return "Savepoint " + instantTime + " rolled back";
+ return String.format("Savepoint \"%s\" rolled back", instantTime);
}
@CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
@@ -130,7 +137,12 @@ public class SavepointsCommand implements CommandMarker {
}
@CliCommand(value = "savepoint delete", help = "Delete the savepoint")
- public String deleteSavepoint(@CliOption(key = {"commit"}, help = "Delete a
savepoint") final String instantTime) throws Exception {
+ public String deleteSavepoint(@CliOption(key = {"commit"}, help = "Delete a
savepoint") final String instantTime,
+ @CliOption(key = {"sparkProperties"}, help = "Spark Properties File
Path") final String sparkPropertiesPath,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help =
"Spark Master") String master,
+ @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
+ help = "Spark executor memory") final String sparkMemory)
+ throws Exception {
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
HoodieTimeline completedInstants =
metaClient.getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
if (completedInstants.empty()) {
@@ -142,12 +154,18 @@ public class SavepointsCommand implements CommandMarker {
return "Commit " + instantTime + " not found in Commits " +
completedInstants;
}
- try (JavaSparkContext jsc = SparkUtil.initJavaSparkConf("Delete
Savepoint")) {
- HoodieWriteClient client = createHoodieClient(jsc,
metaClient.getBasePath());
- client.deleteSavepoint(instantTime);
- refreshMetaClient();
+ SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
+
sparkLauncher.addAppArgs(SparkMain.SparkCommand.DELETE_SAVEPOINT.toString(),
master, sparkMemory, instantTime,
+ metaClient.getBasePath());
+ Process process = sparkLauncher.launch();
+ InputStreamConsumer.captureOutput(process);
+ int exitCode = process.waitFor();
+ // Refresh the current
+ refreshMetaClient();
+ if (exitCode != 0) {
+ return String.format("Failed: Could not delete savepoint \"%s\".",
instantTime);
}
- return "Savepoint " + instantTime + " deleted";
+ return String.format("Savepoint \"%s\" deleted.", instantTime);
}
private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc,
String basePath) throws Exception {
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index be9d7dd..a8c2e72 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.index.HoodieIndex;
import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.hudi.utilities.HDFSParquetImporter;
@@ -54,7 +55,8 @@ public class SparkMain {
* Commands.
*/
enum SparkCommand {
- ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT,
COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_UNSCHEDULE_PLAN,
COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, COMPACT_REPAIR, CLEAN
+ ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, IMPORT, UPSERT,
COMPACT_SCHEDULE, COMPACT_RUN,
+ COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE,
COMPACT_REPAIR, CLEAN, DELETE_SAVEPOINT
}
public static void main(String[] args) throws Exception {
@@ -77,8 +79,8 @@ public class SparkMain {
returnCode = deduplicatePartitionPath(jsc, args[3], args[4], args[5],
args[6]);
break;
case ROLLBACK_TO_SAVEPOINT:
- assert (args.length == 3);
- returnCode = rollbackToSavepoint(jsc, args[1], args[2]);
+ assert (args.length == 5);
+ returnCode = rollbackToSavepoint(jsc, args[3], args[4]);
break;
case IMPORT:
case UPSERT:
@@ -154,6 +156,14 @@ public class SparkMain {
}
clean(jsc, args[3], propsFilePath, configs);
break;
+ case SAVEPOINT:
+ assert (args.length == 7);
+ returnCode = createSavepoint(jsc, args[3], args[4], args[5], args[6]);
+ break;
+ case DELETE_SAVEPOINT:
+ assert (args.length == 5);
+ returnCode = deleteSavepoint(jsc, args[3], args[4]);
+ break;
default:
break;
}
@@ -163,7 +173,8 @@ public class SparkMain {
private static boolean sparkMasterContained(SparkCommand command) {
List<SparkCommand> masterContained =
Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
SparkCommand.COMPACT_UNSCHEDULE_PLAN,
SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
- SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE);
+ SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE,
SparkCommand.SAVEPOINT,
+ SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT);
return masterContained.contains(command);
}
@@ -276,7 +287,20 @@ public class SparkMain {
LOG.info(String.format("The commit \"%s\" rolled back.", instantTime));
return 0;
} else {
- LOG.info(String.format("The commit \"%s\" failed to roll back.",
instantTime));
+ LOG.warn(String.format("The commit \"%s\" failed to roll back.",
instantTime));
+ return -1;
+ }
+ }
+
+ private static int createSavepoint(JavaSparkContext jsc, String commitTime,
String user,
+ String comments, String basePath) throws Exception {
+ HoodieWriteClient client = createHoodieClient(jsc, basePath);
+ try {
+ client.savepoint(commitTime, user, comments);
+ LOG.info(String.format("The commit \"%s\" has been savepointed.",
commitTime));
+ return 0;
+ } catch (HoodieSavepointException se) {
+ LOG.warn(String.format("Failed: Could not create savepoint \"%s\".",
commitTime));
return -1;
}
}
@@ -288,7 +312,19 @@ public class SparkMain {
LOG.info(String.format("The commit \"%s\" rolled back.", savepointTime));
return 0;
} catch (Exception e) {
- LOG.info(String.format("The commit \"%s\" failed to roll back.",
savepointTime));
+ LOG.warn(String.format("The commit \"%s\" failed to roll back.",
savepointTime));
+ return -1;
+ }
+ }
+
+ private static int deleteSavepoint(JavaSparkContext jsc, String
savepointTime, String basePath) throws Exception {
+ HoodieWriteClient client = createHoodieClient(jsc, basePath);
+ try {
+ client.deleteSavepoint(savepointTime);
+ LOG.info(String.format("Savepoint \"%s\" deleted.", savepointTime));
+ return 0;
+ } catch (Exception e) {
+ LOG.warn(String.format("Failed: Could not delete savepoint \"%s\".",
savepointTime));
return -1;
}
}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestSavepointsCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestSavepointsCommand.java
new file mode 100644
index 0000000..2c6a3f2
--- /dev/null
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestSavepointsCommand.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test class for {@link org.apache.hudi.cli.commands.SavepointsCommand}.
+ */
+public class TestSavepointsCommand extends AbstractShellIntegrationTest {
+
+ private String tablePath;
+
+ @BeforeEach
+ public void init() throws IOException {
+ String tableName = "test_table";
+ tablePath = basePath + File.separator + tableName;
+
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+ }
+
+ /**
+ * Test case of command 'savepoints show'.
+ */
+ @Test
+ public void testShowSavepoints() throws IOException {
+ // generate four savepoints
+ for (int i = 100; i < 104; i++) {
+ String instantTime = String.valueOf(i);
+ HoodieTestDataGenerator.createSavepointFile(tablePath, instantTime,
jsc.hadoopConfiguration());
+ }
+
+ CommandResult cr = getShell().executeCommand("savepoints show");
+ assertTrue(cr.isSuccess());
+
+ // generate expect result
+ String[][] rows = Arrays.asList("100", "101", "102",
"103").stream().sorted(Comparator.reverseOrder())
+ .map(instant -> new String[]{instant}).toArray(String[][]::new);
+ String expected = HoodiePrintHelper.print(new String[]
{HoodieTableHeaderFields.HEADER_SAVEPOINT_TIME}, rows);
+
+ assertEquals(expected, cr.getResult().toString());
+ }
+
+ /**
+ * Test case of command 'savepoints refresh'.
+ */
+ @Test
+ public void testRefreshMetaClient() throws IOException {
+ HoodieTimeline timeline =
+
HoodieCLI.getTableMetaClient().getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
+ assertEquals(0, timeline.countInstants(), "There should have no instant at
first");
+
+ // generate four savepoints
+ for (int i = 100; i < 104; i++) {
+ String instantTime = String.valueOf(i);
+ HoodieTestDataGenerator.createSavepointFile(tablePath, instantTime,
jsc.hadoopConfiguration());
+ }
+
+ // Before refresh, no instant
+ timeline =
+
HoodieCLI.getTableMetaClient().getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
+ assertEquals(0, timeline.countInstants(), "there should have no instant");
+
+ CommandResult cr = getShell().executeCommand("savepoints refresh");
+ assertTrue(cr.isSuccess());
+
+ timeline =
+
HoodieCLI.getTableMetaClient().getActiveTimeline().getSavePointTimeline().filterCompletedInstants();
+
+ // After refresh, there are 4 instants
+ assertEquals(4, timeline.countInstants(), "there should have 4 instants");
+ }
+}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
new file mode 100644
index 0000000..ee9a18e
--- /dev/null
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestSavepointsCommand.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hudi.cli.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.springframework.shell.core.CommandResult;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link
org.apache.hudi.cli.commands.SavepointsCommand}.
+ * <p/>
+ * A command use SparkLauncher need load jars under lib which generate during
mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestSavepointsCommand extends AbstractShellIntegrationTest {
+
+ private String tablePath;
+
+ @BeforeEach
+ public void init() throws IOException {
+ String tableName = "test_table";
+ tablePath = basePath + File.separator + tableName;
+
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, "test_table", HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+ }
+
+ /**
+ * Test case of command 'savepoint create'.
+ */
+ @Test
+ public void testSavepoint() {
+ // generate four savepoints
+ for (int i = 100; i < 104; i++) {
+ String instantTime = String.valueOf(i);
+ HoodieTestDataGenerator.createCommitFile(tablePath, instantTime,
jsc.hadoopConfiguration());
+ }
+
+ String savepoint = "102";
+ CommandResult cr = getShell().executeCommand(
+ String.format("savepoint create --commit %s --sparkMaster %s",
savepoint, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals(
+ String.format("The commit \"%s\" has been savepointed.",
savepoint), cr.getResult().toString()));
+
+ // there is 1 savepoint instant
+ HoodieActiveTimeline timeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
+ assertEquals(1, timeline.getSavePointTimeline().countInstants());
+ }
+
+ /**
+ * Test case of command 'savepoint rollback'.
+ */
+ @Test
+ public void testRollbackToSavepoint() throws IOException {
+ // generate four savepoints
+ for (int i = 100; i < 104; i++) {
+ String instantTime = String.valueOf(i);
+ HoodieTestDataGenerator.createCommitFile(tablePath, instantTime,
jsc.hadoopConfiguration());
+ }
+
+ // generate one savepoint
+ String savepoint = "102";
+ HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint,
jsc.hadoopConfiguration());
+
+ CommandResult cr = getShell().executeCommand(
+ String.format("savepoint rollback --savepoint %s --sparkMaster %s",
savepoint, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals(
+ String.format("Savepoint \"%s\" rolled back", savepoint),
cr.getResult().toString()));
+
+ // there is 1 restore instant
+ HoodieActiveTimeline timeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
+ assertEquals(1, timeline.getRestoreTimeline().countInstants());
+
+ // 103 instant had rollback
+ assertFalse(timeline.getCommitTimeline().containsInstant(
+ new HoodieInstant(HoodieInstant.State.COMPLETED, "commit", "103")));
+ }
+
+ /**
+ * Test case of command 'savepoint delete'.
+ */
+ @Test
+ public void testDeleteSavepoint() throws IOException {
+ // generate four savepoints
+ for (int i = 100; i < 104; i++) {
+ String instantTime = String.valueOf(i);
+ HoodieTestDataGenerator.createCommitFile(tablePath, instantTime,
jsc.hadoopConfiguration());
+ }
+
+ // generate two savepoint
+ String savepoint1 = "100";
+ String savepoint2 = "102";
+ HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint1,
jsc.hadoopConfiguration());
+ HoodieTestDataGenerator.createSavepointFile(tablePath, savepoint2,
jsc.hadoopConfiguration());
+
+ HoodieActiveTimeline timeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
+ assertEquals(2, timeline.getSavePointTimeline().countInstants(), "There
should 2 instants.");
+
+ CommandResult cr = getShell().executeCommand(
+ String.format("savepoint delete --commit %s --sparkMaster %s",
savepoint1, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals(
+ String.format("Savepoint \"%s\" deleted.", savepoint1),
cr.getResult().toString()));
+
+ // reload timeline
+ timeline = timeline.reload();
+ assertEquals(1, timeline.getSavePointTimeline().countInstants(), "There
should 1 instants.");
+
+ // after delete, 100 instant should not exist.
+ assertFalse(timeline.containsInstant(new HoodieInstant(false,
HoodieTimeline.SAVEPOINT_ACTION, savepoint1)));
+ }
+}