This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 14ec9630c1 [HUDI-5032] Add archive to cli (#7076)
14ec9630c1 is described below
commit 14ec9630c13ebfc6525ae7153ecb19c0caae3289
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Nov 2 20:14:16 2022 -0400
[HUDI-5032] Add archive to cli (#7076)
Adding archiving capability to cli.
Co-authored-by: Jonathan Vexler <=>
---
.../hudi/cli/commands/ArchivedCommitsCommand.java | 50 ++++++++++++--
.../org/apache/hudi/cli/commands/SparkMain.java | 29 +++++++-
.../hudi/cli/commands/TestArchiveCommand.java | 79 ++++++++++++++++++++++
3 files changed, 152 insertions(+), 6 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index dcd6a2cf3c..7b967ad064 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -18,16 +18,14 @@
package org.apache.hudi.cli.commands;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.specific.SpecificData;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieCommitMetadata;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
+import org.apache.hudi.cli.utils.InputStreamConsumer;
+import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -37,6 +35,16 @@ import
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.util.Utils;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
@@ -52,6 +60,37 @@ import java.util.stream.Collectors;
*/
@ShellComponent
public class ArchivedCommitsCommand {
+ private static final Logger LOG =
LogManager.getLogger(ArchivedCommitsCommand.class);
+ @ShellMethod(key = "trigger archival", value = "trigger archival")
+ public String triggerArchival(
+ @ShellOption(value = {"--minCommits"},
+ help = "Minimum number of instants to retain in the active timeline.
See hoodie.keep.min.commits",
+ defaultValue = "20") int minCommits,
+ @ShellOption(value = {"--maxCommits"},
+ help = "Maximum number of instants to retain in the active timeline.
See hoodie.keep.max.commits",
+ defaultValue = "30") int maxCommits,
+ @ShellOption(value = {"--commitsRetainedByCleaner"}, help = "Number of
commits to retain, without cleaning",
+ defaultValue = "10") int retained,
+ @ShellOption(value = {"--enableMetadata"},
+ help = "Enable the internal metadata table which serves table
metadata like level file listings",
+ defaultValue = "true") boolean enableMetadata,
+ @ShellOption(value = "--sparkMemory", defaultValue = "1G",
+ help = "Spark executor memory") final String sparkMemory,
+ @ShellOption(value = "--sparkMaster", defaultValue = "local", help =
"Spark Master") String master) throws Exception {
+ String sparkPropertiesPath =
+
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
+ SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
+ String cmd = SparkCommand.ARCHIVE.toString();
+ sparkLauncher.addAppArgs(cmd, master, sparkMemory,
Integer.toString(minCommits), Integer.toString(maxCommits),
+ Integer.toString(retained), Boolean.toString(enableMetadata),
HoodieCLI.basePath);
+ Process process = sparkLauncher.launch();
+ InputStreamConsumer.captureOutput(process);
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ return "Failed to trigger archival";
+ }
+ return "Archival successfully triggered";
+ }
@ShellMethod(key = "show archived commit stats", value = "Read commits from
archived files and show details")
public String showArchivedCommits(
@@ -206,4 +245,5 @@ public class ArchivedCommitsCommand {
return new Comparable[] {};
}
}
+
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 6649eaf766..51e9bccac6 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -22,11 +22,14 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.cli.DeDupeType;
import org.apache.hudi.cli.DedupeSparkJob;
import org.apache.hudi.cli.utils.SparkUtil;
+import org.apache.hudi.client.HoodieTimelineArchiver;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
@@ -37,6 +40,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieBootstrapConfig;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -98,7 +102,7 @@ public class SparkMain {
BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT,
IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE,
COMPACT_REPAIR, CLUSTERING_SCHEDULE,
CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER,
DELETE_SAVEPOINT, UPGRADE, DOWNGRADE,
- REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION
+ REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION, ARCHIVE
}
public static void main(String[] args) throws Exception {
@@ -290,6 +294,10 @@ public class SparkMain {
assert (args.length == 6);
returnCode = renamePartition(jsc, args[3], args[4], args[5]);
break;
+ case ARCHIVE:
+ assert (args.length == 8);
+ returnCode = archive(jsc, Integer.parseInt(args[3]),
Integer.parseInt(args[4]), Integer.parseInt(args[5]),
Boolean.parseBoolean(args[6]), args[7]);
+ break;
default:
break;
}
@@ -646,4 +654,23 @@ public class SparkMain {
HoodieFailedWritesCleaningPolicy.EAGER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
}
+
+ private static int archive(JavaSparkContext jsc, int minCommits, int
maxCommits, int commitsRetained, boolean enableMetadata, String basePath) {
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build())
+ .withEmbeddedTimelineServerEnabled(false)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
+ .build();
+ HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
+ HoodieSparkTable<HoodieAvroPayload> table =
HoodieSparkTable.create(config, context);
+ try {
+ HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config,
table);
+ archiver.archiveIfRequired(context,true);
+ } catch (IOException ioe) {
+ LOG.error("Failed to archive with IOException: " + ioe);
+ return -1;
+ }
+ return 0;
+ }
}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java
new file mode 100644
index 0000000000..1a10d41c9a
--- /dev/null
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
+import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.shell.Shell;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("functional")
+@SpringBootTest(properties = {"spring.shell.interactive.enabled=false",
"spring.shell.command.script.enabled=false"})
+public class TestArchiveCommand extends CLIFunctionalTestHarness {
+
+ @Autowired
+ private Shell shell;
+
+ @Test
+ public void testArchiving() throws Exception {
+ HoodieCLI.conf = hadoopConf();
+
+ // Create table and connect
+ String tableName = tableName();
+ String tablePath = tablePath(tableName);
+
+ new TableCommand().createTable(
+ tablePath, tableName,
+ "COPY_ON_WRITE", "", 1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+ HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+
+ // Create six commits
+ for (int i = 100; i < 106; i++) {
+ String timestamp = String.valueOf(i);
+
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath,timestamp,
hadoopConf());
+ }
+
+ Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2
--maxCommits 3 --commitsRetainedByCleaner 1 --enableMetadata false");
+ assertTrue(ShellEvaluationResultUtil.isSuccess(cmdResult));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ //get instants in the active timeline only returns the latest state of the
commit
+ //therefore we expect 2 instants because minCommits is 2
+ assertEquals(2, metaClient.getActiveTimeline().getInstants().count());
+
+ //get instants in the archived timeline returns all instants in the commit
+ //therefore we expect 12 instants because 6 commits - 2 commits in active
timeline = 4 in archived
+ //since each commit is completed, there are 3 instances per commit
(requested, inflight, completed)
+ //and 3 instances per commit * 4 commits = 12 instances
+ assertEquals(12, metaClient.getArchivedTimeline().getInstants().count());
+ }
+
+}
+