This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 14ec9630c1 [HUDI-5032] Add archive to cli (#7076)
14ec9630c1 is described below

commit 14ec9630c13ebfc6525ae7153ecb19c0caae3289
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Nov 2 20:14:16 2022 -0400

    [HUDI-5032] Add archive to cli (#7076)
    
    Adding archiving capability to cli.
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/cli/commands/ArchivedCommitsCommand.java  | 50 ++++++++++++--
 .../org/apache/hudi/cli/commands/SparkMain.java    | 29 +++++++-
 .../hudi/cli/commands/TestArchiveCommand.java      | 79 ++++++++++++++++++++++
 3 files changed, 152 insertions(+), 6 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index dcd6a2cf3c..7b967ad064 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -18,16 +18,14 @@
 
 package org.apache.hudi.cli.commands;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.specific.SpecificData;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.avro.model.HoodieCommitMetadata;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
+import org.apache.hudi.cli.utils.InputStreamConsumer;
+import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -37,6 +35,16 @@ import 
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ClosableIterator;
 import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.spark.launcher.SparkLauncher;
+import org.apache.spark.util.Utils;
 import org.springframework.shell.standard.ShellComponent;
 import org.springframework.shell.standard.ShellMethod;
 import org.springframework.shell.standard.ShellOption;
@@ -52,6 +60,37 @@ import java.util.stream.Collectors;
  */
 @ShellComponent
 public class ArchivedCommitsCommand {
+  private static final Logger LOG = 
LogManager.getLogger(ArchivedCommitsCommand.class);
+  @ShellMethod(key = "trigger archival", value = "trigger archival")
+  public String triggerArchival(
+      @ShellOption(value = {"--minCommits"},
+        help = "Minimum number of instants to retain in the active timeline. 
See hoodie.keep.min.commits",
+        defaultValue = "20") int minCommits,
+      @ShellOption(value = {"--maxCommits"},
+          help = "Maximum number of instants to retain in the active timeline. 
See hoodie.keep.max.commits",
+          defaultValue = "30") int maxCommits,
+      @ShellOption(value = {"--commitsRetainedByCleaner"}, help = "Number of 
commits to retain, without cleaning",
+          defaultValue = "10") int retained,
+      @ShellOption(value = {"--enableMetadata"},
+          help = "Enable the internal metadata table which serves table 
metadata like level file listings",
+          defaultValue = "true") boolean enableMetadata,
+      @ShellOption(value = "--sparkMemory", defaultValue = "1G",
+          help = "Spark executor memory") final String sparkMemory,
+      @ShellOption(value = "--sparkMaster", defaultValue = "local", help = 
"Spark Master") String master) throws Exception {
+    String sparkPropertiesPath =
+        
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
+    SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
+    String cmd = SparkCommand.ARCHIVE.toString();
+    sparkLauncher.addAppArgs(cmd, master, sparkMemory, 
Integer.toString(minCommits), Integer.toString(maxCommits),
+        Integer.toString(retained), Boolean.toString(enableMetadata), 
HoodieCLI.basePath);
+    Process process = sparkLauncher.launch();
+    InputStreamConsumer.captureOutput(process);
+    int exitCode = process.waitFor();
+    if (exitCode != 0) {
+      return "Failed to trigger archival";
+    }
+    return "Archival successfully triggered";
+  }
 
   @ShellMethod(key = "show archived commit stats", value = "Read commits from 
archived files and show details")
   public String showArchivedCommits(
@@ -206,4 +245,5 @@ public class ArchivedCommitsCommand {
       return new Comparable[] {};
     }
   }
+
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 6649eaf766..51e9bccac6 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -22,11 +22,14 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.cli.DeDupeType;
 import org.apache.hudi.cli.DedupeSparkJob;
 import org.apache.hudi.cli.utils.SparkUtil;
+import org.apache.hudi.client.HoodieTimelineArchiver;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -37,6 +40,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieBootstrapConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -98,7 +102,7 @@ public class SparkMain {
     BOOTSTRAP, ROLLBACK, DEDUPLICATE, ROLLBACK_TO_SAVEPOINT, SAVEPOINT, 
IMPORT, UPSERT, COMPACT_SCHEDULE, COMPACT_RUN, COMPACT_SCHEDULE_AND_EXECUTE,
     COMPACT_UNSCHEDULE_PLAN, COMPACT_UNSCHEDULE_FILE, COMPACT_VALIDATE, 
COMPACT_REPAIR, CLUSTERING_SCHEDULE,
     CLUSTERING_RUN, CLUSTERING_SCHEDULE_AND_EXECUTE, CLEAN, DELETE_MARKER, 
DELETE_SAVEPOINT, UPGRADE, DOWNGRADE,
-    REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION
+    REPAIR_DEPRECATED_PARTITION, RENAME_PARTITION, ARCHIVE
   }
 
   public static void main(String[] args) throws Exception {
@@ -290,6 +294,10 @@ public class SparkMain {
           assert (args.length == 6);
           returnCode = renamePartition(jsc, args[3], args[4], args[5]);
           break;
+        case ARCHIVE:
+          assert (args.length == 8);
+          returnCode = archive(jsc, Integer.parseInt(args[3]), 
Integer.parseInt(args[4]), Integer.parseInt(args[5]), 
Boolean.parseBoolean(args[6]), args[7]);
+          break;
         default:
           break;
       }
@@ -646,4 +654,23 @@ public class SparkMain {
             HoodieFailedWritesCleaningPolicy.EAGER).build())
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
   }
+
+  private static int archive(JavaSparkContext jsc, int minCommits, int 
maxCommits, int commitsRetained, boolean enableMetadata, String basePath) {
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
+        
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,maxCommits).build())
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build())
+        .withEmbeddedTimelineServerEnabled(false)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
+        .build();
+    HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
+    HoodieSparkTable<HoodieAvroPayload> table = 
HoodieSparkTable.create(config, context);
+    try {
+      HoodieTimelineArchiver archiver = new HoodieTimelineArchiver(config, 
table);
+      archiver.archiveIfRequired(context,true);
+    } catch (IOException ioe) {
+      LOG.error("Failed to archive with IOException: " + ioe);
+      return  -1;
+    }
+    return 0;
+  }
 }
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java
new file mode 100644
index 0000000000..1a10d41c9a
--- /dev/null
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchiveCommand.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
+import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.cli.testutils.ShellEvaluationResultUtil;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.shell.Shell;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("functional")
+@SpringBootTest(properties = {"spring.shell.interactive.enabled=false", 
"spring.shell.command.script.enabled=false"})
+public class TestArchiveCommand extends CLIFunctionalTestHarness {
+
+  @Autowired
+  private Shell shell;
+
+  @Test
+  public void testArchiving() throws Exception {
+    HoodieCLI.conf = hadoopConf();
+
+    // Create table and connect
+    String tableName = tableName();
+    String tablePath = tablePath(tableName);
+
+    new TableCommand().createTable(
+        tablePath, tableName,
+        "COPY_ON_WRITE", "", 1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+
+    // Create six commits
+    for (int i = 100; i < 106; i++) {
+      String timestamp = String.valueOf(i);
+      
HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath,timestamp,
 hadoopConf());
+    }
+
+    Object cmdResult = shell.evaluate(() -> "trigger archival --minCommits 2 
--maxCommits 3 --commitsRetainedByCleaner 1 --enableMetadata false");
+    assertTrue(ShellEvaluationResultUtil.isSuccess(cmdResult));
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    //get instants in the active timeline only returns the latest state of the 
commit
+    //therefore we expect 2 instants because minCommits is 2
+    assertEquals(2, metaClient.getActiveTimeline().getInstants().count());
+
+    //get instants in the archived timeline returns all instants in the commit
+    //therefore we expect 12 instants because 6 commits - 2 commits in active 
timeline = 4 in archived
+    //since each commit is completed, there are 3 instances per commit 
(requested, inflight, completed)
+    //and 3 instances per commit * 4 commits = 12 instances
+    assertEquals(12, metaClient.getArchivedTimeline().getInstants().count());
+  }
+
+}
+

Reply via email to