This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ecdbd25  [HUDI-699] Fix CompactionCommand and add unit test for 
CompactionCommand (#2325)
ecdbd25 is described below

commit ecdbd2517fd8157d9e96f3d9abf195a589e191ae
Author: hongdd <[email protected]>
AuthorDate: Thu Apr 8 15:35:33 2021 +0800

    [HUDI-699] Fix CompactionCommand and add unit test for CompactionCommand 
(#2325)
---
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  21 ++
 .../hudi/cli/commands/CompactionCommand.java       | 112 ++++---
 .../org/apache/hudi/cli/commands/SparkMain.java    |  50 ++--
 .../hudi/cli/commands/TestCommitsCommand.java      |   2 +-
 .../hudi/cli/commands/TestCompactionCommand.java   | 219 ++++++++++++++
 .../hudi/cli/integ/ITTestCompactionCommand.java    | 330 +++++++++++++++++++++
 .../testutils/AbstractShellIntegrationTest.java    |   5 +
 .../apache/hudi/client/CompactionAdminClient.java  |   2 +-
 .../hudi/io/TestHoodieTimelineArchiveLog.java      |   3 +-
 .../table/timeline/HoodieArchivedTimeline.java     |  53 +++-
 .../table/timeline/TimelineMetadataUtils.java      |  12 +
 11 files changed, 725 insertions(+), 84 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index cb36cfd..e317d5a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -143,4 +143,25 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_TOTAL_ROLLBACK_BLOCKS = "Total Rollback 
Blocks";
   public static final String HEADER_TOTAL_LOG_RECORDS = "Total Log Records";
   public static final String HEADER_TOTAL_UPDATED_RECORDS_COMPACTED = "Total 
Updated Records Compacted";
+
+  /**
+   * Fields of Compaction.
+   */
+  public static final String HEADER_INSTANT_BLANK_TIME = "Instant Time";
+  public static final String HEADER_FILE_PATH = "File Path";
+  public static final String HEADER_COMPACTION_INSTANT_TIME = "Compaction " + 
HEADER_INSTANT_BLANK_TIME;
+  public static final String HEADER_STATE = "State";
+  public static final String HEADER_TOTAL_FILES_TO_BE_COMPACTED = "Total 
FileIds to be Compacted";
+  public static final String HEADER_EXTRA_METADATA = "Extra Metadata";
+  public static final String HEADER_DATA_FILE_PATH = "Data " + 
HEADER_FILE_PATH;
+  public static final String HEADER_TOTAL_DELTA_FILES = "Total " + 
HEADER_DELTA_FILES;
+  public static final String HEADER_METRICS = "getMetrics";
+  public static final String HEADER_BASE_INSTANT_TIME = "Base " + 
HEADER_INSTANT_BLANK_TIME;
+  public static final String HEADER_BASE_DATA_FILE = "Base Data File";
+  public static final String HEADER_VALID = "Valid";
+  public static final String HEADER_ERROR = "Error";
+  public static final String HEADER_SOURCE_FILE_PATH = "Source " + 
HEADER_FILE_PATH;
+  public static final String HEADER_DESTINATION_FILE_PATH = "Destination " + 
HEADER_FILE_PATH;
+  public static final String HEADER_RENAME_EXECUTED = "Rename Executed?";
+  public static final String HEADER_RENAME_SUCCEEDED = "Rename Succeeded?";
 }
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 67445ea..b6a366b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
 import org.apache.hudi.cli.utils.CommitUtil;
@@ -97,8 +98,7 @@ public class CompactionCommand implements CommandMarker {
       @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
       @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = 
"false") final boolean descending,
       @CliOption(key = {"headeronly"}, help = "Print Header Only",
-          unspecifiedDefaultValue = "false") final boolean headerOnly)
-      throws IOException {
+          unspecifiedDefaultValue = "false") final boolean headerOnly) {
     HoodieTableMetaClient client = checkAndGetMetaClient();
     HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
     return printAllCompactions(activeTimeline,
@@ -139,8 +139,7 @@ public class CompactionCommand implements CommandMarker {
           @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
           @CliOption(key = {"desc"}, help = "Ordering", 
unspecifiedDefaultValue = "false") final boolean descending,
           @CliOption(key = {"headeronly"}, help = "Print Header Only",
-                  unspecifiedDefaultValue = "false") final boolean headerOnly)
-          throws Exception {
+                  unspecifiedDefaultValue = "false") final boolean headerOnly) 
{
     if (StringUtils.isNullOrEmpty(startTs)) {
       startTs = CommitUtil.getTimeDaysAgo(10);
     }
@@ -150,7 +149,7 @@ public class CompactionCommand implements CommandMarker {
 
     HoodieTableMetaClient client = checkAndGetMetaClient();
     HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
-    archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
+    archivedTimeline.loadCompactionDetailsInMemory(startTs, endTs);
     try {
       return printAllCompactions(archivedTimeline,
               
compactionPlanReader(this::readCompactionPlanForArchivedTimeline, 
archivedTimeline),
@@ -175,25 +174,25 @@ public class CompactionCommand implements CommandMarker {
     HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
     HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
             HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
-    String startTs = CommitUtil.addHours(compactionInstantTime, -1);
-    String endTs = CommitUtil.addHours(compactionInstantTime, 1);
     try {
-      archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
-      HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeCompactionPlan(
-              archivedTimeline.getInstantDetails(instant).get());
+      archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
+      HoodieCompactionPlan compactionPlan = 
TimelineMetadataUtils.deserializeAvroRecordMetadata(
+              archivedTimeline.getInstantDetails(instant).get(), 
HoodieCompactionPlan.getClassSchema());
       return printCompaction(compactionPlan, sortByField, descending, limit, 
headerOnly);
     } finally {
-      archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
+      archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
     }
   }
 
   @CliCommand(value = "compaction schedule", help = "Schedule Compaction")
   public String scheduleCompact(@CliOption(key = "sparkMemory", 
unspecifiedDefaultValue = "1G",
       help = "Spark executor memory") final String sparkMemory,
-                                @CliOption(key = "propsFilePath", help = "path 
to properties file on localfs or dfs with configurations for hoodie client for 
compacting",
-                                  unspecifiedDefaultValue = "") final String 
propsFilePath,
-                                @CliOption(key = "hoodieConfigs", help = "Any 
configuration that can be set in the properties file can be passed here in the 
form of an array",
-                                  unspecifiedDefaultValue = "") final String[] 
configs) throws Exception {
+      @CliOption(key = "propsFilePath", help = "path to properties file on 
localfs or dfs with configurations for hoodie client for compacting",
+          unspecifiedDefaultValue = "") final String propsFilePath,
+      @CliOption(key = "hoodieConfigs", help = "Any configuration that can be 
set in the properties file can be passed here in the form of an array",
+          unspecifiedDefaultValue = "") final String[] configs,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help 
= "Spark Master") String master)
+      throws Exception {
     HoodieTableMetaClient client = checkAndGetMetaClient();
     boolean initialized = HoodieCLI.initConf();
     HoodieCLI.initFS(initialized);
@@ -204,8 +203,9 @@ public class CompactionCommand implements CommandMarker {
     String sparkPropertiesPath =
         
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-    sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(), 
client.getBasePath(),
-        client.getTableConfig().getTableName(), compactionInstantTime, 
sparkMemory, propsFilePath);
+    String cmd = SparkCommand.COMPACT_SCHEDULE.toString();
+    sparkLauncher.addAppArgs(cmd, master, sparkMemory, client.getBasePath(),
+        client.getTableConfig().getTableName(), compactionInstantTime, 
propsFilePath);
     UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
     Process process = sparkLauncher.launch();
     InputStreamConsumer.captureOutput(process);
@@ -222,6 +222,8 @@ public class CompactionCommand implements CommandMarker {
           help = "Parallelism for hoodie compaction") final String parallelism,
       @CliOption(key = "schemaFilePath", mandatory = true,
           help = "Path for Avro schema file") final String schemaFilePath,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local",
+          help = "Spark Master") String master,
       @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
           help = "Spark executor memory") final String sparkMemory,
       @CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number 
of retries") final String retry,
@@ -249,9 +251,9 @@ public class CompactionCommand implements CommandMarker {
     String sparkPropertiesPath =
         
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
     SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
-    sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), 
client.getBasePath(),
+    sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), master, 
sparkMemory, client.getBasePath(),
         client.getTableConfig().getTableName(), compactionInstantTime, 
parallelism, schemaFilePath,
-        sparkMemory, retry, propsFilePath);
+        retry, propsFilePath);
     UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
     Process process = sparkLauncher.launch();
     InputStreamConsumer.captureOutput(process);
@@ -279,15 +281,15 @@ public class CompactionCommand implements CommandMarker {
             .filter(pair -> pair.getRight() != null)
             .collect(Collectors.toList());
 
-    Set<HoodieInstant> committedInstants = 
timeline.getCommitTimeline().filterCompletedInstants()
-            .getInstants().collect(Collectors.toSet());
+    Set<String> committedInstants = 
timeline.getCommitTimeline().filterCompletedInstants()
+            
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
 
     List<Comparable[]> rows = new ArrayList<>();
     for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan : 
compactionPlans) {
       HoodieCompactionPlan plan = compactionPlan.getRight();
       HoodieInstant instant = compactionPlan.getLeft();
       final HoodieInstant.State state;
-      if (committedInstants.contains(instant)) {
+      if (committedInstants.contains(instant.getTimestamp())) {
         state = HoodieInstant.State.COMPLETED;
       } else {
         state = instant.getState();
@@ -304,10 +306,12 @@ public class CompactionCommand implements CommandMarker {
     }
 
     Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
-    TableHeader header = new TableHeader().addTableHeaderField("Compaction 
Instant Time").addTableHeaderField("State")
-            .addTableHeaderField("Total FileIds to be Compacted");
+    TableHeader header = new TableHeader()
+        
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPACTION_INSTANT_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE)
+        
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_TO_BE_COMPACTED);
     if (includeExtraMetadata) {
-      header = header.addTableHeaderField("Extra Metadata");
+      header = 
header.addTableHeaderField(HoodieTableHeaderFields.HEADER_EXTRA_METADATA);
     }
     return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
   }
@@ -326,14 +330,17 @@ public class CompactionCommand implements CommandMarker {
 
   private HoodieCompactionPlan 
readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
                                                                      
HoodieInstant instant) {
-    if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
-      return null;
-    } else {
+    // filter inflight compaction
+    if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())
+        && HoodieInstant.State.INFLIGHT.equals(instant.getState())) {
       try {
-        return 
TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
-      } catch (IOException e) {
-        throw new HoodieIOException(e.getMessage(), e);
+        return 
TimelineMetadataUtils.deserializeAvroRecordMetadata(archivedTimeline.getInstantDetails(instant).get(),
+            HoodieCompactionPlan.getClassSchema());
+      } catch (Exception e) {
+        throw new HoodieException(e.getMessage(), e);
       }
+    } else {
+      return null;
     }
   }
 
@@ -362,7 +369,7 @@ public class CompactionCommand implements CommandMarker {
     }
   }
 
-  private String printCompaction(HoodieCompactionPlan compactionPlan,
+  protected String printCompaction(HoodieCompactionPlan compactionPlan,
                                  String sortByField,
                                  boolean descending,
                                  int limit,
@@ -376,9 +383,13 @@ public class CompactionCommand implements CommandMarker {
     }
 
     Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
-    TableHeader header = new TableHeader().addTableHeaderField("Partition 
Path").addTableHeaderField("File Id")
-            .addTableHeaderField("Base Instant").addTableHeaderField("Data 
File Path")
-            .addTableHeaderField("Total Delta 
Files").addTableHeaderField("getMetrics");
+    TableHeader header = new TableHeader()
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_INSTANT)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_DATA_FILE_PATH)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_DELTA_FILES)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_METRICS);
     return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
   }
 
@@ -404,7 +415,7 @@ public class CompactionCommand implements CommandMarker {
   public String validateCompaction(
       @CliOption(key = "instant", mandatory = true, help = "Compaction 
Instant") String compactionInstant,
       @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = 
"Parallelism") String parallelism,
-      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = 
"Spark Master ") String master,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help 
= "Spark Master") String master,
       @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = 
"executor memory") String sparkMemory,
       @CliOption(key = {"limit"}, help = "Limit commits", 
unspecifiedDefaultValue = "-1") Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") String sortByField,
@@ -444,9 +455,13 @@ public class CompactionCommand implements CommandMarker {
       });
 
       Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
-      TableHeader header = new TableHeader().addTableHeaderField("File 
Id").addTableHeaderField("Base Instant Time")
-          .addTableHeaderField("Base Data File").addTableHeaderField("Num 
Delta Files").addTableHeaderField("Valid")
-          .addTableHeaderField("Error");
+      TableHeader header = new TableHeader()
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_INSTANT_TIME)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_DATA_FILE)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELTA_FILES)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_VALID)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_ERROR);
 
       output = message + HoodiePrintHelper.print(header, 
fieldNameToConverterMap, sortByField, descending, limit,
           headerOnly, rows);
@@ -463,7 +478,7 @@ public class CompactionCommand implements CommandMarker {
   public String unscheduleCompaction(
       @CliOption(key = "instant", mandatory = true, help = "Compaction 
Instant") String compactionInstant,
       @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = 
"Parallelism") String parallelism,
-      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = 
"Spark Master ") String master,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help 
= "Spark Master") String master,
       @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = 
"executor memory") String sparkMemory,
       @CliOption(key = {"skipValidation"}, help = "skip validation", 
unspecifiedDefaultValue = "false") boolean skipV,
       @CliOption(key = {"dryRun"}, help = "Dry Run Mode", 
unspecifiedDefaultValue = "false") boolean dryRun,
@@ -508,7 +523,8 @@ public class CompactionCommand implements CommandMarker {
   @CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule 
Compaction for a fileId")
   public String unscheduleCompactFile(
       @CliOption(key = "fileId", mandatory = true, help = "File Id") final 
String fileId,
-      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = 
"Spark Master ") String master,
+      @CliOption(key = "partitionPath", mandatory = true, help = "partition 
path") final String partitionPath,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help 
= "Spark Master") String master,
       @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = 
"executor memory") String sparkMemory,
       @CliOption(key = {"skipValidation"}, help = "skip validation", 
unspecifiedDefaultValue = "false") boolean skipV,
       @CliOption(key = {"dryRun"}, help = "Dry Run Mode", 
unspecifiedDefaultValue = "false") boolean dryRun,
@@ -529,7 +545,7 @@ public class CompactionCommand implements CommandMarker {
           
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
       SparkLauncher sparkLauncher = 
SparkUtil.initLauncher(sparkPropertiesPath);
       
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(), 
master, sparkMemory, client.getBasePath(),
-          fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(),
+          fileId, partitionPath, outputPathStr, "1", 
Boolean.valueOf(skipV).toString(),
           Boolean.valueOf(dryRun).toString());
       Process process = sparkLauncher.launch();
       InputStreamConsumer.captureOutput(process);
@@ -554,7 +570,7 @@ public class CompactionCommand implements CommandMarker {
   public String repairCompaction(
       @CliOption(key = "instant", mandatory = true, help = "Compaction 
Instant") String compactionInstant,
       @CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help = 
"Parallelism") String parallelism,
-      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help = 
"Spark Master ") String master,
+      @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help 
= "Spark Master") String master,
       @CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help = 
"executor memory") String sparkMemory,
       @CliOption(key = {"dryRun"}, help = "Dry Run Mode", 
unspecifiedDefaultValue = "false") boolean dryRun,
       @CliOption(key = {"limit"}, help = "Limit commits", 
unspecifiedDefaultValue = "-1") Integer limit,
@@ -616,9 +632,13 @@ public class CompactionCommand implements CommandMarker {
       });
 
       Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
-      TableHeader header = new TableHeader().addTableHeaderField("File 
Id").addTableHeaderField("Source File Path")
-          .addTableHeaderField("Destination File 
Path").addTableHeaderField("Rename Executed?")
-          .addTableHeaderField("Rename 
Succeeded?").addTableHeaderField("Error");
+      TableHeader header = new TableHeader()
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_SOURCE_FILE_PATH)
+          
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DESTINATION_FILE_PATH)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_RENAME_EXECUTED)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_RENAME_SUCCEEDED)
+          .addTableHeaderField(HoodieTableHeaderFields.HEADER_ERROR);
 
       return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
     } else {
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 7833ee7..afb22fd 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -79,9 +79,7 @@ public class SparkMain {
 
     SparkCommand cmd = SparkCommand.valueOf(command);
 
-    JavaSparkContext jsc = sparkMasterContained(cmd)
-        ? SparkUtil.initJavaSparkConf("hoodie-cli-" + command, 
Option.of(args[1]), Option.of(args[2]))
-        : SparkUtil.initJavaSparkConf("hoodie-cli-" + command);
+    JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" + 
command, Option.of(args[1]), Option.of(args[2]));
     int returnCode = 0;
     try {
       switch (cmd) {
@@ -112,29 +110,29 @@ public class SparkMain {
                   Integer.parseInt(args[9]), args[10], 
Integer.parseInt(args[11]), propsFilePath, configs);
           break;
         case COMPACT_RUN:
-          assert (args.length >= 9);
+          assert (args.length >= 10);
           propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[8])) {
-            propsFilePath = args[8];
+          if (!StringUtils.isNullOrEmpty(args[9])) {
+            propsFilePath = args[9];
           }
           configs = new ArrayList<>();
-          if (args.length > 9) {
+          if (args.length > 10) {
             configs.addAll(Arrays.asList(args).subList(9, args.length));
           }
-          returnCode = compact(jsc, args[1], args[2], args[3], 
Integer.parseInt(args[4]), args[5], args[6],
-                  Integer.parseInt(args[7]), false, propsFilePath, configs);
+          returnCode = compact(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]), args[7],
+              Integer.parseInt(args[8]), false, propsFilePath, configs);
           break;
         case COMPACT_SCHEDULE:
-          assert (args.length >= 6);
+          assert (args.length >= 7);
           propsFilePath = null;
-          if (!StringUtils.isNullOrEmpty(args[5])) {
-            propsFilePath = args[5];
+          if (!StringUtils.isNullOrEmpty(args[6])) {
+            propsFilePath = args[6];
           }
           configs = new ArrayList<>();
-          if (args.length > 6) {
-            configs.addAll(Arrays.asList(args).subList(6, args.length));
+          if (args.length > 7) {
+            configs.addAll(Arrays.asList(args).subList(7, args.length));
           }
-          returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4], 
0, true, propsFilePath, configs);
+          returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, true, 
propsFilePath, configs);
           break;
         case COMPACT_VALIDATE:
           assert (args.length == 7);
@@ -148,9 +146,9 @@ public class SparkMain {
           returnCode = 0;
           break;
         case COMPACT_UNSCHEDULE_FILE:
-          assert (args.length == 9);
-          doCompactUnscheduleFile(jsc, args[3], args[4], args[5], 
Integer.parseInt(args[6]),
-                  Boolean.parseBoolean(args[7]), 
Boolean.parseBoolean(args[8]));
+          assert (args.length == 10);
+          doCompactUnscheduleFile(jsc, args[3], args[4], args[5], args[6], 
Integer.parseInt(args[7]),
+              Boolean.parseBoolean(args[8]), Boolean.parseBoolean(args[9]));
           returnCode = 0;
           break;
         case COMPACT_UNSCHEDULE_PLAN:
@@ -209,14 +207,6 @@ public class SparkMain {
     System.exit(returnCode);
   }
 
-  private static boolean sparkMasterContained(SparkCommand command) {
-    List<SparkCommand> masterContained = 
Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
-        SparkCommand.COMPACT_UNSCHEDULE_PLAN, 
SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
-        SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE, 
SparkCommand.SAVEPOINT,
-        SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT, 
SparkCommand.ROLLBACK, SparkCommand.BOOTSTRAP);
-    return masterContained.contains(command);
-  }
-
   protected static void clean(JavaSparkContext jsc, String basePath, String 
propsFilePath,
       List<String> configs) {
     HoodieCleaner.Config cfg = new HoodieCleaner.Config();
@@ -280,13 +270,14 @@ public class SparkMain {
     new HoodieCompactionAdminTool(cfg).run(jsc);
   }
 
-  private static void doCompactUnscheduleFile(JavaSparkContext jsc, String 
basePath, String fileId, String outputPath,
-      int parallelism, boolean skipValidation, boolean dryRun)
+  private static void doCompactUnscheduleFile(JavaSparkContext jsc, String 
basePath, String fileId, String partitionPath,
+      String outputPath, int parallelism, boolean skipValidation, boolean 
dryRun)
       throws Exception {
     HoodieCompactionAdminTool.Config cfg = new 
HoodieCompactionAdminTool.Config();
     cfg.basePath = basePath;
     cfg.operation = Operation.UNSCHEDULE_FILE;
     cfg.outputPath = outputPath;
+    cfg.partitionPath = partitionPath;
     cfg.fileId = fileId;
     cfg.parallelism = parallelism;
     cfg.dryRun = dryRun;
@@ -295,7 +286,7 @@ public class SparkMain {
   }
 
   private static int compact(JavaSparkContext jsc, String basePath, String 
tableName, String compactionInstant,
-      int parallelism, String schemaFile, String sparkMemory, int retry, 
boolean schedule, String propsFilePath,
+      int parallelism, String schemaFile, int retry, boolean schedule, String 
propsFilePath,
       List<String> configs) {
     HoodieCompactor.Config cfg = new HoodieCompactor.Config();
     cfg.basePath = basePath;
@@ -308,7 +299,6 @@ public class SparkMain {
     cfg.runSchedule = schedule;
     cfg.propsFilePath = propsFilePath;
     cfg.configs = configs;
-    jsc.getConf().set("spark.executor.memory", sparkMemory);
     return new HoodieCompactor(jsc, cfg).compact(retry);
   }
 
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index 84b3576..5ad4c4c 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -194,7 +194,7 @@ public class TestCommitsCommand extends 
AbstractShellIntegrationTest {
     // archived 101 and 102 instants, remove 103 and 104 instant
     data.remove("103");
     data.remove("104");
-    String expected = generateExpectData(3, data);
+    String expected = generateExpectData(1, data);
     expected = removeNonWordAndStripSpace(expected);
     String got = removeNonWordAndStripSpace(cr.getResult().toString());
     assertEquals(expected, got);
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
new file mode 100644
index 0000000..4dd69dc
--- /dev/null
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.CompactionTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test Cases for {@link CompactionCommand}.
+ */
+public class TestCompactionCommand extends AbstractShellIntegrationTest {
+
+  private String tableName;
+  private String tablePath;
+
+  @BeforeEach
+  public void init() {
+    tableName = "test_table";
+    tablePath = basePath + tableName;
+  }
+
+  @Test
+  public void testVerifyTableType() throws IOException {
+    // create COW table.
+    new TableCommand().createTable(
+        tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
HoodieAvroPayload.class.getName());
+
+    // expect HoodieException for COPY_ON_WRITE table.
+    assertThrows(HoodieException.class,
+        () -> new CompactionCommand().compactionsAll(false, -1, "", false, 
false));
+  }
+
+  /**
+   * Test case for command 'compactions show all'.
+   */
+  @Test
+  public void testCompactionsAll() throws IOException {
+    // create MOR table.
+    new TableCommand().createTable(
+        tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
HoodieAvroPayload.class.getName());
+
+    
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(),
 false, 3, 4, 3, 3);
+
+    HoodieCLI.getTableMetaClient().reloadActiveTimeline();
+
+    CommandResult cr = getShell().executeCommand("compactions show all");
+    System.out.println(cr.getResult().toString());
+
+    TableHeader header = new TableHeader().addTableHeaderField("Compaction 
Instant Time").addTableHeaderField("State")
+        .addTableHeaderField("Total FileIds to be Compacted");
+    Map<String, Integer> fileIds = new HashMap();
+    fileIds.put("001", 3);
+    fileIds.put("003", 4);
+    fileIds.put("005", 3);
+    fileIds.put("007", 3);
+    List<Comparable[]> rows = new ArrayList<>();
+    Arrays.asList("001", "003", "005", 
"007").stream().sorted(Comparator.reverseOrder()).forEach(instant -> {
+      rows.add(new Comparable[] {instant, "REQUESTED", fileIds.get(instant)});
+    });
+    String expected = HoodiePrintHelper.print(header, new HashMap<>(), "", 
false, -1, false, rows);
+    assertEquals(expected, cr.getResult().toString());
+  }
+
+  /**
+   * Test case for command 'compaction show'.
+   */
+  @Test
+  public void testCompactionShow() throws IOException {
+    // create MOR table.
+    new TableCommand().createTable(
+        tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
HoodieAvroPayload.class.getName());
+
+    
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(),
 false, 3, 4, 3, 3);
+
+    HoodieCLI.getTableMetaClient().reloadActiveTimeline();
+
+    CommandResult cr = getShell().executeCommand("compaction show --instant 
001");
+    System.out.println(cr.getResult().toString());
+  }
+
+  private void generateCompactionInstances() throws IOException {
+    // create MOR table.
+    new TableCommand().createTable(
+        tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
HoodieAvroPayload.class.getName());
+
+    
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(),
 true, 1, 2, 3, 4);
+
+    HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().reloadActiveTimeline();
+    // Create six commits
+    Arrays.asList("001", "003", "005", "007").forEach(timestamp -> {
+      activeTimeline.transitionCompactionInflightToComplete(
+          new HoodieInstant(HoodieInstant.State.INFLIGHT, COMPACTION_ACTION, 
timestamp), Option.empty());
+    });
+
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+  }
+
+  private void generateArchive() throws IOException {
+    // Generate archive
+    HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+        
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2,
 2)
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
 3).build())
+        .forTable("test-trip-table").build();
+    // archive
+    metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+    HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, 
table);
+    archiveLog.archiveIfRequired(context);
+  }
+
+  /**
+   * Test case for command 'compactions showarchived'.
+   */
+  @Test
+  public void testCompactionsShowArchived() throws IOException {
+    generateCompactionInstances();
+
+    generateArchive();
+
+    CommandResult cr = getShell().executeCommand("compactions showarchived 
--startTs 001 --endTs 005");
+
+    // generate result
+    Map<String, Integer> fileMap = new HashMap<>();
+    fileMap.put("001", 1);
+    fileMap.put("003", 2);
+    fileMap.put("005", 3);
+    List<Comparable[]> rows = Arrays.asList("005", "003", 
"001").stream().map(i ->
+        new Comparable[] {i, HoodieInstant.State.COMPLETED, 
fileMap.get(i)}).collect(Collectors.toList());
+    Map<String, Function<Object, String>> fieldNameToConverterMap = new 
HashMap<>();
+    TableHeader header = new TableHeader().addTableHeaderField("Compaction 
Instant Time").addTableHeaderField("State")
+        .addTableHeaderField("Total FileIds to be Compacted");
+    String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap, 
"", false, -1, false, rows);
+
+    expected = removeNonWordAndStripSpace(expected);
+    String got = removeNonWordAndStripSpace(cr.getResult().toString());
+    assertEquals(expected, got);
+  }
+
+  /**
+   * Test case for command 'compaction showarchived'.
+   */
+  @Test
+  public void testCompactionShowArchived() throws IOException {
+    generateCompactionInstances();
+
+    String instance = "001";
+    // get compaction plan before compaction
+    HoodieCompactionPlan plan = 
TimelineMetadataUtils.deserializeCompactionPlan(
+        
HoodieCLI.getTableMetaClient().reloadActiveTimeline().readCompactionPlanAsBytes(
+            HoodieTimeline.getCompactionRequestedInstant(instance)).get());
+
+    generateArchive();
+
+    CommandResult cr = getShell().executeCommand("compaction showarchived 
--instant " + instance);
+
+    // generate expected
+    String expected = new CompactionCommand().printCompaction(plan, "", false, 
-1, false);
+
+    expected = removeNonWordAndStripSpace(expected);
+    String got = removeNonWordAndStripSpace(cr.getResult().toString());
+    assertEquals(expected, got);
+  }
+}
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java
new file mode 100644
index 0000000..37a2098
--- /dev/null
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.client.CompactionAdminClient;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.TestCompactionAdminClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.CompactionTestUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link 
org.apache.hudi.cli.commands.CompactionCommand}.
+ * <p/>
+ * A command use SparkLauncher need load jars under lib which generate during 
mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestCompactionCommand extends AbstractShellIntegrationTest {
+
+  private String tablePath;
+  private String tableName;
+
+  @BeforeEach
+  public void init() throws IOException {
+    tableName = "test_table_" + ITTestCompactionCommand.class.getName();
+    tablePath = Paths.get(basePath, tableName).toString();
+
+    HoodieCLI.conf = jsc.hadoopConfiguration();
+    // Create table and connect
+    new TableCommand().createTable(
+        tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+        "", TimelineLayoutVersion.VERSION_1, 
"org.apache.hudi.common.model.HoodieAvroPayload");
+    metaClient.setBasePath(tablePath);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+  }
+
+  /**
+   * Test case for command 'compaction schedule'.
+   */
+  @Test
+  public void testScheduleCompact() throws IOException {
+    // generate commits
+    generateCommits();
+
+    CommandResult cr = getShell().executeCommand(
+        String.format("compaction schedule --hoodieConfigs 
hoodie.compact.inline.max.delta.commits=1 --sparkMaster %s",
+            "local"));
+    assertAll("Command run failed",
+        () -> assertTrue(cr.isSuccess()),
+        () -> assertTrue(
+            cr.getResult().toString().startsWith("Attempted to schedule 
compaction for")));
+
+    // there is 1 requested compaction
+    HoodieActiveTimeline timeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
+    assertEquals(1, 
timeline.filterPendingCompactionTimeline().countInstants());
+  }
+
+  /**
+   * Test case for command 'compaction run'.
+   */
+  @Test
+  public void testCompact() throws IOException {
+    // generate commits
+    generateCommits();
+
+    String instance = prepareScheduleCompaction();
+
+    String schemaPath = Paths.get(basePath, "compaction.schema").toString();
+    writeSchemaToTmpFile(schemaPath);
+
+    CommandResult cr2 = getShell().executeCommand(
+        String.format("compaction run --parallelism %s --schemaFilePath %s 
--sparkMaster %s",
+            2, schemaPath, "local"));
+
+    assertAll("Command run failed",
+        () -> assertTrue(cr2.isSuccess()),
+        () -> assertTrue(
+            cr2.getResult().toString().startsWith("Compaction successfully 
completed for")));
+
+    // assert compaction complete
+    assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+        .filterCompletedInstants().getInstants()
+        
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
+        "Pending compaction must be completed");
+  }
+
+  /**
+   * Test case for command 'compaction validate'.
+   */
+  @Test
+  public void testValidateCompaction() throws IOException {
+    // generate commits
+    generateCommits();
+
+    String instance = prepareScheduleCompaction();
+
+    CommandResult cr = getShell().executeCommand(
+        String.format("compaction validate --instant %s --sparkMaster %s", 
instance, "local"));
+
+    assertAll("Command run failed",
+        () -> assertTrue(cr.isSuccess()),
+        () -> assertTrue(
+            // compaction requested should be valid
+            cr.getResult().toString().contains("COMPACTION PLAN VALID")));
+  }
+
+  /**
+   * This function mainly tests the workflow of 'compaction unschedule' 
command.
+   * The real test of {@link 
org.apache.hudi.client.CompactionAdminClient#unscheduleCompactionPlan}
+   * is {@link TestCompactionAdminClient#testUnscheduleCompactionPlan()}.
+   */
+  @Test
+  public void testUnscheduleCompaction() throws Exception {
+    // generate commits
+    generateCommits();
+
+    String instance = prepareScheduleCompaction();
+
+    CommandResult cr = getShell().executeCommand(
+        String.format("compaction unschedule --instant %s --sparkMaster %s", 
instance, "local"));
+
+    // Always has no file
+    assertAll("Command run failed",
+        () -> assertTrue(cr.isSuccess()),
+        () -> assertEquals("No File renames needed to unschedule pending 
compaction. Operation successful.",
+            cr.getResult().toString()));
+  }
+
+  /**
+   * This function mainly tests the workflow of 'compaction unscheduleFileId' 
command.
+   * The real test of {@link 
org.apache.hudi.client.CompactionAdminClient#unscheduleCompactionFileId}
+   * is {@link TestCompactionAdminClient#testUnscheduleCompactionFileId}.
+   */
+  @Test
+  public void testUnscheduleCompactFile() throws IOException {
+    int numEntriesPerInstant = 10;
+    CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, 
false, numEntriesPerInstant,
+        numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
+
+    CompactionOperation op = CompactionOperation.convertFromAvroRecordInstance(
+        CompactionUtils.getCompactionPlan(metaClient, 
"001").getOperations().stream().findFirst().get());
+
+    CommandResult cr = getShell().executeCommand(
+        String.format("compaction unscheduleFileId --fileId %s --partitionPath 
%s --sparkMaster %s",
+            op.getFileGroupId().getFileId(), 
op.getFileGroupId().getPartitionPath(), "local"));
+
+    assertAll("Command run failed",
+        () -> assertTrue(cr.isSuccess()),
+        () -> 
assertTrue(removeNonWordAndStripSpace(cr.getResult().toString()).contains("true")),
+        () -> 
assertFalse(removeNonWordAndStripSpace(cr.getResult().toString()).contains("false")));
+  }
+
+  /**
+   * This function mainly tests the workflow of 'compaction repair' command.
+   * The real test of {@link 
org.apache.hudi.client.CompactionAdminClient#repairCompaction}
+   * is {@link TestCompactionAdminClient#testRepairCompactionPlan}.
+   */
+  @Test
+  public void testRepairCompaction() throws Exception {
+    int numEntriesPerInstant = 10;
+    String compactionInstant = "001";
+    CompactionTestUtils.setupAndValidateCompactionOperations(metaClient, 
false, numEntriesPerInstant,
+        numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
+
+    metaClient.reloadActiveTimeline();
+    CompactionAdminClient client = new CompactionAdminClient(new 
HoodieSparkEngineContext(jsc), metaClient.getBasePath());
+    List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
+        client.getRenamingActionsForUnschedulingCompactionPlan(metaClient, 
compactionInstant, 1, Option.empty(), false);
+
+    renameFiles.forEach(lfPair -> {
+      try {
+        metaClient.getFs().rename(lfPair.getLeft().getPath(), 
lfPair.getRight().getPath());
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+    });
+
+    client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
+
+    CommandResult cr = getShell().executeCommand(
+        String.format("compaction repair --instant %s --sparkMaster %s", 
compactionInstant, "local"));
+
+    // All Executes is succeeded, result contains true and has no false
+    // Expected:
+    // ║ File Id │ Source File Path │ Destination File Path │ Rename Executed? 
│ Rename Succeeded? │ Error ║
+    // ║ *       │     *            │        *              │    true          
│     true          │       ║
+    assertAll("Command run failed",
+        () -> assertTrue(cr.isSuccess()),
+        () -> 
assertTrue(removeNonWordAndStripSpace(cr.getResult().toString()).contains("true")),
+        () -> 
assertFalse(removeNonWordAndStripSpace(cr.getResult().toString()).contains("false")));
+  }
+
+  private String prepareScheduleCompaction() {
+    // generate requested compaction
+    CommandResult cr = getShell().executeCommand(
+        String.format("compaction schedule --hoodieConfigs 
hoodie.compact.inline.max.delta.commits=1 --sparkMaster %s",
+            "local"));
+    assertTrue(cr.isSuccess());
+
+    // get compaction instance
+    HoodieActiveTimeline timeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
+    Option<String> instance =
+        
timeline.filterPendingCompactionTimeline().firstInstant().map(HoodieInstant::getTimestamp);
+    assertTrue(instance.isPresent(), "Must have pending compaction.");
+    return instance.get();
+  }
+
+  private void writeSchemaToTmpFile(String schemaPath) throws IOException {
+    try (BufferedWriter out = new BufferedWriter(new FileWriter(schemaPath))) {
+      out.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+    }
+  }
+
+  private void generateCommits() throws IOException {
+    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+    // Create the write client to write some records in
+    HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+        
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
+        .withDeleteParallelism(2).forTable(tableName)
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
+
+    SparkRDDWriteClient<HoodieAvroPayload> client = new 
SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
+
+    List<HoodieRecord> records = insert(jsc, client, dataGen);
+    upsert(jsc, client, dataGen, records);
+    delete(jsc, client, records);
+  }
+
+  private List<HoodieRecord> insert(JavaSparkContext jsc, 
SparkRDDWriteClient<HoodieAvroPayload> client,
+      HoodieTestDataGenerator dataGen) throws IOException {
+    // inserts
+    String newCommitTime = "001";
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    operateFunc(SparkRDDWriteClient::insert, client, writeRecords, 
newCommitTime);
+    return records;
+  }
+
+  private void upsert(JavaSparkContext jsc, 
SparkRDDWriteClient<HoodieAvroPayload> client,
+      HoodieTestDataGenerator dataGen, List<HoodieRecord> records)
+      throws IOException {
+    // updates
+    String newCommitTime = "002";
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieRecord> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2);
+    records.addAll(toBeUpdated);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+    operateFunc(SparkRDDWriteClient::upsert, client, writeRecords, 
newCommitTime);
+  }
+
+  private void delete(JavaSparkContext jsc, 
SparkRDDWriteClient<HoodieAvroPayload> client,
+       List<HoodieRecord> records) {
+    // Delete
+    String newCommitTime = "003";
+    client.startCommitWithTime(newCommitTime);
+
+    // just delete half of the records
+    int numToDelete = records.size() / 2;
+    List<HoodieKey> toBeDeleted = 
records.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
+    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
+    client.delete(deleteRecords, newCommitTime);
+  }
+
+  private JavaRDD<WriteStatus> operateFunc(
+      HoodieClientTestBase.Function3<JavaRDD<WriteStatus>, 
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+      SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord> 
writeRecords, String commitTime)
+      throws IOException {
+    return writeFn.apply(client, writeRecords, commitTime);
+  }
+}
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java
index a7cf85c..67449dc 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.cli.testutils;
 
+import org.apache.hudi.common.model.HoodieTableType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
@@ -37,4 +38,8 @@ public abstract class AbstractShellIntegrationTest extends 
AbstractShellBaseInte
   public void teardown() throws Exception {
     cleanupResources();
   }
+
+  protected HoodieTableType getTableType() {
+    return HoodieTableType.MERGE_ON_READ;
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index 9ace03a..1c869e4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -383,7 +383,7 @@ public class CompactionAdminClient extends 
AbstractHoodieClient {
    * @return list of pairs of log-files (old, new) and for each pair, rename 
must be done to successfully unschedule
    *         compaction.
    */
-  protected List<Pair<HoodieLogFile, HoodieLogFile>> 
getRenamingActionsForUnschedulingCompactionPlan(
+  public List<Pair<HoodieLogFile, HoodieLogFile>> 
getRenamingActionsForUnschedulingCompactionPlan(
       HoodieTableMetaClient metaClient, String compactionInstant, int 
parallelism,
       Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation) 
throws IOException {
     HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get()
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index 5b18881..f0f1392 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -464,7 +464,8 @@ public class TestHoodieTimelineArchiveLog extends 
HoodieClientTestHarness {
     assertTrue(result);
     HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
     List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, 
instant3);
-    assertEquals(new HashSet<>(archivedInstants), 
archivedTimeline.getInstants().collect(Collectors.toSet()));
+    assertEquals(new HashSet<>(archivedInstants),
+        
archivedTimeline.filterCompletedInstants().getInstants().collect(Collectors.toSet()));
     assertFalse(wrapperFs.exists(markerPath));
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 9f8c439..6b05edd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -18,12 +18,14 @@
 
 package org.apache.hudi.common.table.timeline;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -43,6 +45,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -66,6 +69,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
 
   private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = 
"commits";
   private static final String ACTION_TYPE_KEY = "actionType";
+  private static final String ACTION_STATE = "actionState";
   private HoodieTableMetaClient metaClient;
   private Map<String, byte[]> readCommits = new HashMap<>();
 
@@ -108,6 +112,22 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     loadInstants(startTs, endTs);
   }
 
+  public void loadCompactionDetailsInMemory(String compactionInstantTime) {
+    loadCompactionDetailsInMemory(compactionInstantTime, 
compactionInstantTime);
+  }
+
+  public void loadCompactionDetailsInMemory(String startTs, String endTs) {
+    // load compactionPlan
+    loadInstants(new TimeRangeFilter(startTs, endTs), true, record ->
+        
record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
+            && 
HoodieInstant.State.INFLIGHT.toString().equals(record.get(ACTION_STATE).toString())
+    );
+  }
+
+  public void clearInstantDetailsFromMemory(String instantTime) {
+    this.readCommits.remove(instantTime);
+  }
+
   public void clearInstantDetailsFromMemory(String startTs, String endTs) {
     this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant ->
             this.readCommits.remove(instant.getTimestamp()));
@@ -126,11 +146,16 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     final String instantTime  = 
record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
     final String action = record.get(ACTION_TYPE_KEY).toString();
     if (loadDetails) {
-      Option.ofNullable(record.get(getMetadataKey(action))).map(actionData ->
-              this.readCommits.put(instantTime, 
actionData.toString().getBytes(StandardCharsets.UTF_8))
-      );
+      Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> {
+        if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
+          this.readCommits.put(instantTime, 
HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData));
+        } else {
+          this.readCommits.put(instantTime, 
actionData.toString().getBytes(StandardCharsets.UTF_8));
+        }
+        return null;
+      });
     }
-    return new HoodieInstant(false, action, instantTime);
+    return new 
HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), 
action, instantTime);
   }
 
   private String getMetadataKey(String action) {
@@ -145,6 +170,8 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
         return "hoodieRollbackMetadata";
       case HoodieTimeline.SAVEPOINT_ACTION:
         return "hoodieSavePointMetadata";
+      case HoodieTimeline.COMPACTION_ACTION:
+        return "hoodieCompactionPlan";
       default:
         throw new HoodieIOException("Unknown action in metadata " + action);
     }
@@ -158,12 +185,18 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     return loadInstants(new TimeRangeFilter(startTs, endTs), true);
   }
 
+  private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean 
loadInstantDetails) {
+    return loadInstants(filter, loadInstantDetails, record -> true);
+  }
+
   /**
    * This is method to read selected instants. Do NOT use this directly use 
one of the helper methods above
    * If loadInstantDetails is set to true, this would also update 
'readCommits' map with commit details
    * If filter is specified, only the filtered instants are loaded
+   * If commitsFilter is specified, only the filtered records are loaded
    */
-  private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean 
loadInstantDetails) {
+  private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean 
loadInstantDetails,
+       Function<GenericRecord, Boolean> commitsFilter) {
     try {
       // list all files
       FileStatus[] fsStatuses = metaClient.getFs().globStatus(
@@ -187,6 +220,7 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
             List<IndexedRecord> records = blk.getRecords();
             // filter blocks in desired time window
             Stream<HoodieInstant> instantsInBlkStream = records.stream()
+                    .filter(r -> commitsFilter.apply((GenericRecord) r))
                     .map(r -> readCommit((GenericRecord) r, 
loadInstantDetails));
 
             if (filter != null) {
@@ -254,4 +288,13 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
       return 0;
     }
   }
+
+  @Override
+  public HoodieDefaultTimeline getWriteTimeline() {
+    // filter in-memory instants
+    Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
+    return new HoodieDefaultTimeline(getInstants().filter(i ->
+        readCommits.keySet().contains(i.getTimestamp()))
+        .filter(s -> validActions.contains(s.getAction())), details);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 9b419ca..a50c299 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table.timeline;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -33,12 +34,14 @@ import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 
+import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableByteArrayInput;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecordBase;
@@ -176,4 +179,13 @@ public class TimelineMetadataUtils {
     ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize 
metadata of type " + clazz);
     return fileReader.next();
   }
+
+  public static <T extends SpecificRecordBase> T 
deserializeAvroRecordMetadata(byte[] bytes, Schema schema)
+      throws IOException {
+    return  deserializeAvroRecordMetadata(HoodieAvroUtils.bytesToAvro(bytes, 
schema), schema);
+  }
+
+  public static <T extends SpecificRecordBase> T 
deserializeAvroRecordMetadata(Object object, Schema schema) {
+    return  (T) SpecificData.get().deepCopy(schema, object);
+  }
 }

Reply via email to