This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ecdbd25 [HUDI-699] Fix CompactionCommand and add unit test for
CompactionCommand (#2325)
ecdbd25 is described below
commit ecdbd2517fd8157d9e96f3d9abf195a589e191ae
Author: hongdd <[email protected]>
AuthorDate: Thu Apr 8 15:35:33 2021 +0800
[HUDI-699] Fix CompactionCommand and add unit test for CompactionCommand
(#2325)
---
.../apache/hudi/cli/HoodieTableHeaderFields.java | 21 ++
.../hudi/cli/commands/CompactionCommand.java | 112 ++++---
.../org/apache/hudi/cli/commands/SparkMain.java | 50 ++--
.../hudi/cli/commands/TestCommitsCommand.java | 2 +-
.../hudi/cli/commands/TestCompactionCommand.java | 219 ++++++++++++++
.../hudi/cli/integ/ITTestCompactionCommand.java | 330 +++++++++++++++++++++
.../testutils/AbstractShellIntegrationTest.java | 5 +
.../apache/hudi/client/CompactionAdminClient.java | 2 +-
.../hudi/io/TestHoodieTimelineArchiveLog.java | 3 +-
.../table/timeline/HoodieArchivedTimeline.java | 53 +++-
.../table/timeline/TimelineMetadataUtils.java | 12 +
11 files changed, 725 insertions(+), 84 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index cb36cfd..e317d5a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -143,4 +143,25 @@ public class HoodieTableHeaderFields {
public static final String HEADER_TOTAL_ROLLBACK_BLOCKS = "Total Rollback
Blocks";
public static final String HEADER_TOTAL_LOG_RECORDS = "Total Log Records";
public static final String HEADER_TOTAL_UPDATED_RECORDS_COMPACTED = "Total
Updated Records Compacted";
+
+ /**
+ * Fields of Compaction.
+ */
+ public static final String HEADER_INSTANT_BLANK_TIME = "Instant Time";
+ public static final String HEADER_FILE_PATH = "File Path";
+ public static final String HEADER_COMPACTION_INSTANT_TIME = "Compaction " +
HEADER_INSTANT_BLANK_TIME;
+ public static final String HEADER_STATE = "State";
+ public static final String HEADER_TOTAL_FILES_TO_BE_COMPACTED = "Total
FileIds to be Compacted";
+ public static final String HEADER_EXTRA_METADATA = "Extra Metadata";
+ public static final String HEADER_DATA_FILE_PATH = "Data " +
HEADER_FILE_PATH;
+ public static final String HEADER_TOTAL_DELTA_FILES = "Total " +
HEADER_DELTA_FILES;
+ public static final String HEADER_METRICS = "getMetrics";
+ public static final String HEADER_BASE_INSTANT_TIME = "Base " +
HEADER_INSTANT_BLANK_TIME;
+ public static final String HEADER_BASE_DATA_FILE = "Base Data File";
+ public static final String HEADER_VALID = "Valid";
+ public static final String HEADER_ERROR = "Error";
+ public static final String HEADER_SOURCE_FILE_PATH = "Source " +
HEADER_FILE_PATH;
+ public static final String HEADER_DESTINATION_FILE_PATH = "Destination " +
HEADER_FILE_PATH;
+ public static final String HEADER_RENAME_EXECUTED = "Rename Executed?";
+ public static final String HEADER_RENAME_SUCCEEDED = "Rename Succeeded?";
}
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 67445ea..b6a366b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieCompactionOperation;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
import org.apache.hudi.cli.utils.CommitUtil;
@@ -97,8 +98,7 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue =
"false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only",
- unspecifiedDefaultValue = "false") final boolean headerOnly)
- throws IOException {
+ unspecifiedDefaultValue = "false") final boolean headerOnly) {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
return printAllCompactions(activeTimeline,
@@ -139,8 +139,7 @@ public class CompactionCommand implements CommandMarker {
@CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering",
unspecifiedDefaultValue = "false") final boolean descending,
@CliOption(key = {"headeronly"}, help = "Print Header Only",
- unspecifiedDefaultValue = "false") final boolean headerOnly)
- throws Exception {
+ unspecifiedDefaultValue = "false") final boolean headerOnly)
{
if (StringUtils.isNullOrEmpty(startTs)) {
startTs = CommitUtil.getTimeDaysAgo(10);
}
@@ -150,7 +149,7 @@ public class CompactionCommand implements CommandMarker {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
- archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
+ archivedTimeline.loadCompactionDetailsInMemory(startTs, endTs);
try {
return printAllCompactions(archivedTimeline,
compactionPlanReader(this::readCompactionPlanForArchivedTimeline,
archivedTimeline),
@@ -175,25 +174,25 @@ public class CompactionCommand implements CommandMarker {
HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
- String startTs = CommitUtil.addHours(compactionInstantTime, -1);
- String endTs = CommitUtil.addHours(compactionInstantTime, 1);
try {
- archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
- HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeCompactionPlan(
- archivedTimeline.getInstantDetails(instant).get());
+ archivedTimeline.loadCompactionDetailsInMemory(compactionInstantTime);
+ HoodieCompactionPlan compactionPlan =
TimelineMetadataUtils.deserializeAvroRecordMetadata(
+ archivedTimeline.getInstantDetails(instant).get(),
HoodieCompactionPlan.getClassSchema());
return printCompaction(compactionPlan, sortByField, descending, limit,
headerOnly);
} finally {
- archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
+ archivedTimeline.clearInstantDetailsFromMemory(compactionInstantTime);
}
}
@CliCommand(value = "compaction schedule", help = "Schedule Compaction")
public String scheduleCompact(@CliOption(key = "sparkMemory",
unspecifiedDefaultValue = "1G",
help = "Spark executor memory") final String sparkMemory,
- @CliOption(key = "propsFilePath", help = "path
to properties file on localfs or dfs with configurations for hoodie client for
compacting",
- unspecifiedDefaultValue = "") final String
propsFilePath,
- @CliOption(key = "hoodieConfigs", help = "Any
configuration that can be set in the properties file can be passed here in the
form of an array",
- unspecifiedDefaultValue = "") final String[]
configs) throws Exception {
+ @CliOption(key = "propsFilePath", help = "path to properties file on
localfs or dfs with configurations for hoodie client for compacting",
+ unspecifiedDefaultValue = "") final String propsFilePath,
+ @CliOption(key = "hoodieConfigs", help = "Any configuration that can be
set in the properties file can be passed here in the form of an array",
+ unspecifiedDefaultValue = "") final String[] configs,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help
= "Spark Master") String master)
+ throws Exception {
HoodieTableMetaClient client = checkAndGetMetaClient();
boolean initialized = HoodieCLI.initConf();
HoodieCLI.initFS(initialized);
@@ -204,8 +203,9 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
- sparkLauncher.addAppArgs(SparkCommand.COMPACT_SCHEDULE.toString(),
client.getBasePath(),
- client.getTableConfig().getTableName(), compactionInstantTime,
sparkMemory, propsFilePath);
+ String cmd = SparkCommand.COMPACT_SCHEDULE.toString();
+ sparkLauncher.addAppArgs(cmd, master, sparkMemory, client.getBasePath(),
+ client.getTableConfig().getTableName(), compactionInstantTime,
propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -222,6 +222,8 @@ public class CompactionCommand implements CommandMarker {
help = "Parallelism for hoodie compaction") final String parallelism,
@CliOption(key = "schemaFilePath", mandatory = true,
help = "Path for Avro schema file") final String schemaFilePath,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local",
+ help = "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "4G",
help = "Spark executor memory") final String sparkMemory,
@CliOption(key = "retry", unspecifiedDefaultValue = "1", help = "Number
of retries") final String retry,
@@ -249,9 +251,9 @@ public class CompactionCommand implements CommandMarker {
String sparkPropertiesPath =
Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher = SparkUtil.initLauncher(sparkPropertiesPath);
- sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(),
client.getBasePath(),
+ sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), master,
sparkMemory, client.getBasePath(),
client.getTableConfig().getTableName(), compactionInstantTime,
parallelism, schemaFilePath,
- sparkMemory, retry, propsFilePath);
+ retry, propsFilePath);
UtilHelpers.validateAndAddProperties(configs, sparkLauncher);
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -279,15 +281,15 @@ public class CompactionCommand implements CommandMarker {
.filter(pair -> pair.getRight() != null)
.collect(Collectors.toList());
- Set<HoodieInstant> committedInstants =
timeline.getCommitTimeline().filterCompletedInstants()
- .getInstants().collect(Collectors.toSet());
+ Set<String> committedInstants =
timeline.getCommitTimeline().filterCompletedInstants()
+
.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
List<Comparable[]> rows = new ArrayList<>();
for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan :
compactionPlans) {
HoodieCompactionPlan plan = compactionPlan.getRight();
HoodieInstant instant = compactionPlan.getLeft();
final HoodieInstant.State state;
- if (committedInstants.contains(instant)) {
+ if (committedInstants.contains(instant.getTimestamp())) {
state = HoodieInstant.State.COMPLETED;
} else {
state = instant.getState();
@@ -304,10 +306,12 @@ public class CompactionCommand implements CommandMarker {
}
Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
- TableHeader header = new TableHeader().addTableHeaderField("Compaction
Instant Time").addTableHeaderField("State")
- .addTableHeaderField("Total FileIds to be Compacted");
+ TableHeader header = new TableHeader()
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPACTION_INSTANT_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_FILES_TO_BE_COMPACTED);
if (includeExtraMetadata) {
- header = header.addTableHeaderField("Extra Metadata");
+ header =
header.addTableHeaderField(HoodieTableHeaderFields.HEADER_EXTRA_METADATA);
}
return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
}
@@ -326,14 +330,17 @@ public class CompactionCommand implements CommandMarker {
private HoodieCompactionPlan
readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
HoodieInstant instant) {
- if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
- return null;
- } else {
+ // filter inflight compaction
+ if (HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())
+ && HoodieInstant.State.INFLIGHT.equals(instant.getState())) {
try {
- return
TimelineMetadataUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
- } catch (IOException e) {
- throw new HoodieIOException(e.getMessage(), e);
+ return
TimelineMetadataUtils.deserializeAvroRecordMetadata(archivedTimeline.getInstantDetails(instant).get(),
+ HoodieCompactionPlan.getClassSchema());
+ } catch (Exception e) {
+ throw new HoodieException(e.getMessage(), e);
}
+ } else {
+ return null;
}
}
@@ -362,7 +369,7 @@ public class CompactionCommand implements CommandMarker {
}
}
- private String printCompaction(HoodieCompactionPlan compactionPlan,
+ protected String printCompaction(HoodieCompactionPlan compactionPlan,
String sortByField,
boolean descending,
int limit,
@@ -376,9 +383,13 @@ public class CompactionCommand implements CommandMarker {
}
Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
- TableHeader header = new TableHeader().addTableHeaderField("Partition
Path").addTableHeaderField("File Id")
- .addTableHeaderField("Base Instant").addTableHeaderField("Data
File Path")
- .addTableHeaderField("Total Delta
Files").addTableHeaderField("getMetrics");
+ TableHeader header = new TableHeader()
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_PARTITION_PATH)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_INSTANT)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_DATA_FILE_PATH)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_TOTAL_DELTA_FILES)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_METRICS);
return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
}
@@ -404,7 +415,7 @@ public class CompactionCommand implements CommandMarker {
public String validateCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction
Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help =
"Parallelism") String parallelism,
- @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help =
"Spark Master ") String master,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help
= "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help =
"executor memory") String sparkMemory,
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") String sortByField,
@@ -444,9 +455,13 @@ public class CompactionCommand implements CommandMarker {
});
Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
- TableHeader header = new TableHeader().addTableHeaderField("File
Id").addTableHeaderField("Base Instant Time")
- .addTableHeaderField("Base Data File").addTableHeaderField("Num
Delta Files").addTableHeaderField("Valid")
- .addTableHeaderField("Error");
+ TableHeader header = new TableHeader()
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_INSTANT_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_BASE_DATA_FILE)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_NUM_DELTA_FILES)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_VALID)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_ERROR);
output = message + HoodiePrintHelper.print(header,
fieldNameToConverterMap, sortByField, descending, limit,
headerOnly, rows);
@@ -463,7 +478,7 @@ public class CompactionCommand implements CommandMarker {
public String unscheduleCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction
Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help =
"Parallelism") String parallelism,
- @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help =
"Spark Master ") String master,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help
= "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help =
"executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation",
unspecifiedDefaultValue = "false") boolean skipV,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode",
unspecifiedDefaultValue = "false") boolean dryRun,
@@ -508,7 +523,8 @@ public class CompactionCommand implements CommandMarker {
@CliCommand(value = "compaction unscheduleFileId", help = "UnSchedule
Compaction for a fileId")
public String unscheduleCompactFile(
@CliOption(key = "fileId", mandatory = true, help = "File Id") final
String fileId,
- @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help =
"Spark Master ") String master,
+ @CliOption(key = "partitionPath", mandatory = true, help = "partition
path") final String partitionPath,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help
= "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help =
"executor memory") String sparkMemory,
@CliOption(key = {"skipValidation"}, help = "skip validation",
unspecifiedDefaultValue = "false") boolean skipV,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode",
unspecifiedDefaultValue = "false") boolean dryRun,
@@ -529,7 +545,7 @@ public class CompactionCommand implements CommandMarker {
.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties()));
SparkLauncher sparkLauncher =
SparkUtil.initLauncher(sparkPropertiesPath);
sparkLauncher.addAppArgs(SparkCommand.COMPACT_UNSCHEDULE_FILE.toString(),
master, sparkMemory, client.getBasePath(),
- fileId, outputPathStr, "1", Boolean.valueOf(skipV).toString(),
+ fileId, partitionPath, outputPathStr, "1",
Boolean.valueOf(skipV).toString(),
Boolean.valueOf(dryRun).toString());
Process process = sparkLauncher.launch();
InputStreamConsumer.captureOutput(process);
@@ -554,7 +570,7 @@ public class CompactionCommand implements CommandMarker {
public String repairCompaction(
@CliOption(key = "instant", mandatory = true, help = "Compaction
Instant") String compactionInstant,
@CliOption(key = {"parallelism"}, unspecifiedDefaultValue = "3", help =
"Parallelism") String parallelism,
- @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "", help =
"Spark Master ") String master,
+ @CliOption(key = "sparkMaster", unspecifiedDefaultValue = "local", help
= "Spark Master") String master,
@CliOption(key = "sparkMemory", unspecifiedDefaultValue = "2G", help =
"executor memory") String sparkMemory,
@CliOption(key = {"dryRun"}, help = "Dry Run Mode",
unspecifiedDefaultValue = "false") boolean dryRun,
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") Integer limit,
@@ -616,9 +632,13 @@ public class CompactionCommand implements CommandMarker {
});
Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
- TableHeader header = new TableHeader().addTableHeaderField("File
Id").addTableHeaderField("Source File Path")
- .addTableHeaderField("Destination File
Path").addTableHeaderField("Rename Executed?")
- .addTableHeaderField("Rename
Succeeded?").addTableHeaderField("Error");
+ TableHeader header = new TableHeader()
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_FILE_ID)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_SOURCE_FILE_PATH)
+
.addTableHeaderField(HoodieTableHeaderFields.HEADER_DESTINATION_FILE_PATH)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_RENAME_EXECUTED)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_RENAME_SUCCEEDED)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_ERROR);
return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
} else {
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 7833ee7..afb22fd 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -79,9 +79,7 @@ public class SparkMain {
SparkCommand cmd = SparkCommand.valueOf(command);
- JavaSparkContext jsc = sparkMasterContained(cmd)
- ? SparkUtil.initJavaSparkConf("hoodie-cli-" + command,
Option.of(args[1]), Option.of(args[2]))
- : SparkUtil.initJavaSparkConf("hoodie-cli-" + command);
+ JavaSparkContext jsc = SparkUtil.initJavaSparkConf("hoodie-cli-" +
command, Option.of(args[1]), Option.of(args[2]));
int returnCode = 0;
try {
switch (cmd) {
@@ -112,29 +110,29 @@ public class SparkMain {
Integer.parseInt(args[9]), args[10],
Integer.parseInt(args[11]), propsFilePath, configs);
break;
case COMPACT_RUN:
- assert (args.length >= 9);
+ assert (args.length >= 10);
propsFilePath = null;
- if (!StringUtils.isNullOrEmpty(args[8])) {
- propsFilePath = args[8];
+ if (!StringUtils.isNullOrEmpty(args[9])) {
+ propsFilePath = args[9];
}
configs = new ArrayList<>();
- if (args.length > 9) {
+ if (args.length > 10) {
configs.addAll(Arrays.asList(args).subList(9, args.length));
}
- returnCode = compact(jsc, args[1], args[2], args[3],
Integer.parseInt(args[4]), args[5], args[6],
- Integer.parseInt(args[7]), false, propsFilePath, configs);
+ returnCode = compact(jsc, args[3], args[4], args[5],
Integer.parseInt(args[6]), args[7],
+ Integer.parseInt(args[8]), false, propsFilePath, configs);
break;
case COMPACT_SCHEDULE:
- assert (args.length >= 6);
+ assert (args.length >= 7);
propsFilePath = null;
- if (!StringUtils.isNullOrEmpty(args[5])) {
- propsFilePath = args[5];
+ if (!StringUtils.isNullOrEmpty(args[6])) {
+ propsFilePath = args[6];
}
configs = new ArrayList<>();
- if (args.length > 6) {
- configs.addAll(Arrays.asList(args).subList(6, args.length));
+ if (args.length > 7) {
+ configs.addAll(Arrays.asList(args).subList(7, args.length));
}
- returnCode = compact(jsc, args[1], args[2], args[3], 1, "", args[4],
0, true, propsFilePath, configs);
+ returnCode = compact(jsc, args[3], args[4], args[5], 1, "", 0, true,
propsFilePath, configs);
break;
case COMPACT_VALIDATE:
assert (args.length == 7);
@@ -148,9 +146,9 @@ public class SparkMain {
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_FILE:
- assert (args.length == 9);
- doCompactUnscheduleFile(jsc, args[3], args[4], args[5],
Integer.parseInt(args[6]),
- Boolean.parseBoolean(args[7]),
Boolean.parseBoolean(args[8]));
+ assert (args.length == 10);
+ doCompactUnscheduleFile(jsc, args[3], args[4], args[5], args[6],
Integer.parseInt(args[7]),
+ Boolean.parseBoolean(args[8]), Boolean.parseBoolean(args[9]));
returnCode = 0;
break;
case COMPACT_UNSCHEDULE_PLAN:
@@ -209,14 +207,6 @@ public class SparkMain {
System.exit(returnCode);
}
- private static boolean sparkMasterContained(SparkCommand command) {
- List<SparkCommand> masterContained =
Arrays.asList(SparkCommand.COMPACT_VALIDATE, SparkCommand.COMPACT_REPAIR,
- SparkCommand.COMPACT_UNSCHEDULE_PLAN,
SparkCommand.COMPACT_UNSCHEDULE_FILE, SparkCommand.CLEAN,
- SparkCommand.IMPORT, SparkCommand.UPSERT, SparkCommand.DEDUPLICATE,
SparkCommand.SAVEPOINT,
- SparkCommand.DELETE_SAVEPOINT, SparkCommand.ROLLBACK_TO_SAVEPOINT,
SparkCommand.ROLLBACK, SparkCommand.BOOTSTRAP);
- return masterContained.contains(command);
- }
-
protected static void clean(JavaSparkContext jsc, String basePath, String
propsFilePath,
List<String> configs) {
HoodieCleaner.Config cfg = new HoodieCleaner.Config();
@@ -280,13 +270,14 @@ public class SparkMain {
new HoodieCompactionAdminTool(cfg).run(jsc);
}
- private static void doCompactUnscheduleFile(JavaSparkContext jsc, String
basePath, String fileId, String outputPath,
- int parallelism, boolean skipValidation, boolean dryRun)
+ private static void doCompactUnscheduleFile(JavaSparkContext jsc, String
basePath, String fileId, String partitionPath,
+ String outputPath, int parallelism, boolean skipValidation, boolean
dryRun)
throws Exception {
HoodieCompactionAdminTool.Config cfg = new
HoodieCompactionAdminTool.Config();
cfg.basePath = basePath;
cfg.operation = Operation.UNSCHEDULE_FILE;
cfg.outputPath = outputPath;
+ cfg.partitionPath = partitionPath;
cfg.fileId = fileId;
cfg.parallelism = parallelism;
cfg.dryRun = dryRun;
@@ -295,7 +286,7 @@ public class SparkMain {
}
private static int compact(JavaSparkContext jsc, String basePath, String
tableName, String compactionInstant,
- int parallelism, String schemaFile, String sparkMemory, int retry,
boolean schedule, String propsFilePath,
+ int parallelism, String schemaFile, int retry, boolean schedule, String
propsFilePath,
List<String> configs) {
HoodieCompactor.Config cfg = new HoodieCompactor.Config();
cfg.basePath = basePath;
@@ -308,7 +299,6 @@ public class SparkMain {
cfg.runSchedule = schedule;
cfg.propsFilePath = propsFilePath;
cfg.configs = configs;
- jsc.getConf().set("spark.executor.memory", sparkMemory);
return new HoodieCompactor(jsc, cfg).compact(retry);
}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
index 84b3576..5ad4c4c 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java
@@ -194,7 +194,7 @@ public class TestCommitsCommand extends
AbstractShellIntegrationTest {
// archived 101 and 102 instants, remove 103 and 104 instant
data.remove("103");
data.remove("104");
- String expected = generateExpectData(3, data);
+ String expected = generateExpectData(1, data);
expected = removeNonWordAndStripSpace(expected);
String got = removeNonWordAndStripSpace(cr.getResult().toString());
assertEquals(expected, got);
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
new file mode 100644
index 0000000..4dd69dc
--- /dev/null
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCompactionCommand.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.CompactionTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTimelineArchiveLog;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test Cases for {@link CompactionCommand}.
+ */
+public class TestCompactionCommand extends AbstractShellIntegrationTest {
+
+ private String tableName;
+ private String tablePath;
+
+ @BeforeEach
+ public void init() {
+ tableName = "test_table";
+ tablePath = basePath + tableName;
+ }
+
+ @Test
+ public void testVerifyTableType() throws IOException {
+ // create COW table.
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
+ "", TimelineLayoutVersion.VERSION_1,
HoodieAvroPayload.class.getName());
+
+ // expect HoodieException for COPY_ON_WRITE table.
+ assertThrows(HoodieException.class,
+ () -> new CompactionCommand().compactionsAll(false, -1, "", false,
false));
+ }
+
+ /**
+ * Test case for command 'compactions show all'.
+ */
+ @Test
+ public void testCompactionsAll() throws IOException {
+ // create MOR table.
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+ "", TimelineLayoutVersion.VERSION_1,
HoodieAvroPayload.class.getName());
+
+
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(),
false, 3, 4, 3, 3);
+
+ HoodieCLI.getTableMetaClient().reloadActiveTimeline();
+
+ CommandResult cr = getShell().executeCommand("compactions show all");
+ System.out.println(cr.getResult().toString());
+
+ TableHeader header = new TableHeader().addTableHeaderField("Compaction
Instant Time").addTableHeaderField("State")
+ .addTableHeaderField("Total FileIds to be Compacted");
+ Map<String, Integer> fileIds = new HashMap();
+ fileIds.put("001", 3);
+ fileIds.put("003", 4);
+ fileIds.put("005", 3);
+ fileIds.put("007", 3);
+ List<Comparable[]> rows = new ArrayList<>();
+ Arrays.asList("001", "003", "005",
"007").stream().sorted(Comparator.reverseOrder()).forEach(instant -> {
+ rows.add(new Comparable[] {instant, "REQUESTED", fileIds.get(instant)});
+ });
+ String expected = HoodiePrintHelper.print(header, new HashMap<>(), "",
false, -1, false, rows);
+ assertEquals(expected, cr.getResult().toString());
+ }
+
+ /**
+ * Test case for command 'compaction show'.
+ */
+ @Test
+ public void testCompactionShow() throws IOException {
+ // create MOR table.
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+ "", TimelineLayoutVersion.VERSION_1,
HoodieAvroPayload.class.getName());
+
+
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(),
false, 3, 4, 3, 3);
+
+ HoodieCLI.getTableMetaClient().reloadActiveTimeline();
+
+ CommandResult cr = getShell().executeCommand("compaction show --instant
001");
+ System.out.println(cr.getResult().toString());
+ }
+
+ private void generateCompactionInstances() throws IOException {
+ // create MOR table.
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+ "", TimelineLayoutVersion.VERSION_1,
HoodieAvroPayload.class.getName());
+
+
CompactionTestUtils.setupAndValidateCompactionOperations(HoodieCLI.getTableMetaClient(),
true, 1, 2, 3, 4);
+
+ HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().reloadActiveTimeline();
+ // Create six commits
+ Arrays.asList("001", "003", "005", "007").forEach(timestamp -> {
+ activeTimeline.transitionCompactionInflightToComplete(
+ new HoodieInstant(HoodieInstant.State.INFLIGHT, COMPACTION_ACTION,
timestamp), Option.empty());
+ });
+
+ metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+ }
+
+ private void generateArchive() throws IOException {
+ // Generate archive
+ HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2,
2)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2,
3).build())
+ .forTable("test-trip-table").build();
+ // archive
+ metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
+ HoodieSparkTable table = HoodieSparkTable.create(cfg, context, metaClient);
+ HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg,
table);
+ archiveLog.archiveIfRequired(context);
+ }
+
+ /**
+ * Test case for command 'compactions showarchived'.
+ */
+ @Test
+ public void testCompactionsShowArchived() throws IOException {
+ generateCompactionInstances();
+
+ generateArchive();
+
+ CommandResult cr = getShell().executeCommand("compactions showarchived
--startTs 001 --endTs 005");
+
+ // generate result
+ Map<String, Integer> fileMap = new HashMap<>();
+ fileMap.put("001", 1);
+ fileMap.put("003", 2);
+ fileMap.put("005", 3);
+ List<Comparable[]> rows = Arrays.asList("005", "003",
"001").stream().map(i ->
+ new Comparable[] {i, HoodieInstant.State.COMPLETED,
fileMap.get(i)}).collect(Collectors.toList());
+ Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
+ TableHeader header = new TableHeader().addTableHeaderField("Compaction
Instant Time").addTableHeaderField("State")
+ .addTableHeaderField("Total FileIds to be Compacted");
+ String expected = HoodiePrintHelper.print(header, fieldNameToConverterMap,
"", false, -1, false, rows);
+
+ expected = removeNonWordAndStripSpace(expected);
+ String got = removeNonWordAndStripSpace(cr.getResult().toString());
+ assertEquals(expected, got);
+ }
+
+ /**
+ * Test case for command 'compaction showarchived'.
+ */
+ @Test
+ public void testCompactionShowArchived() throws IOException {
+ generateCompactionInstances();
+
+ String instance = "001";
+ // get compaction plan before compaction
+ HoodieCompactionPlan plan =
TimelineMetadataUtils.deserializeCompactionPlan(
+
HoodieCLI.getTableMetaClient().reloadActiveTimeline().readCompactionPlanAsBytes(
+ HoodieTimeline.getCompactionRequestedInstant(instance)).get());
+
+ generateArchive();
+
+ CommandResult cr = getShell().executeCommand("compaction showarchived
--instant " + instance);
+
+ // generate expected
+ String expected = new CompactionCommand().printCompaction(plan, "", false,
-1, false);
+
+ expected = removeNonWordAndStripSpace(expected);
+ String got = removeNonWordAndStripSpace(cr.getResult().toString());
+ assertEquals(expected, got);
+ }
+}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java
new file mode 100644
index 0000000..37a2098
--- /dev/null
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/integ/ITTestCompactionCommand.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.integ;
+
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.commands.TableCommand;
+import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
+import org.apache.hudi.client.CompactionAdminClient;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.TestCompactionAdminClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.CompactionOperation;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.CompactionTestUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.CompactionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.shell.core.CommandResult;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration test class for {@link
org.apache.hudi.cli.commands.CompactionCommand}.
+ * <p/>
+ * A command use SparkLauncher need load jars under lib which generate during
mvn package.
+ * Use integration test instead of unit test.
+ */
+public class ITTestCompactionCommand extends AbstractShellIntegrationTest {
+
+ private String tablePath;
+ private String tableName;
+
+ @BeforeEach
+ public void init() throws IOException {
+ tableName = "test_table_" + ITTestCompactionCommand.class.getName();
+ tablePath = Paths.get(basePath, tableName).toString();
+
+ HoodieCLI.conf = jsc.hadoopConfiguration();
+ // Create table and connect
+ new TableCommand().createTable(
+ tablePath, tableName, HoodieTableType.MERGE_ON_READ.name(),
+ "", TimelineLayoutVersion.VERSION_1,
"org.apache.hudi.common.model.HoodieAvroPayload");
+ metaClient.setBasePath(tablePath);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ }
+
+ /**
+ * Test case for command 'compaction schedule'.
+ */
+ @Test
+ public void testScheduleCompact() throws IOException {
+ // generate commits
+ generateCommits();
+
+ CommandResult cr = getShell().executeCommand(
+ String.format("compaction schedule --hoodieConfigs
hoodie.compact.inline.max.delta.commits=1 --sparkMaster %s",
+ "local"));
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertTrue(
+ cr.getResult().toString().startsWith("Attempted to schedule
compaction for")));
+
+ // there is 1 requested compaction
+ HoodieActiveTimeline timeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
+ assertEquals(1,
timeline.filterPendingCompactionTimeline().countInstants());
+ }
+
+ /**
+ * Test case for command 'compaction run'.
+ */
+ @Test
+ public void testCompact() throws IOException {
+ // generate commits
+ generateCommits();
+
+ String instance = prepareScheduleCompaction();
+
+ String schemaPath = Paths.get(basePath, "compaction.schema").toString();
+ writeSchemaToTmpFile(schemaPath);
+
+ CommandResult cr2 = getShell().executeCommand(
+ String.format("compaction run --parallelism %s --schemaFilePath %s
--sparkMaster %s",
+ 2, schemaPath, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr2.isSuccess()),
+ () -> assertTrue(
+ cr2.getResult().toString().startsWith("Compaction successfully
completed for")));
+
+ // assert compaction complete
+ assertTrue(HoodieCLI.getTableMetaClient().getActiveTimeline().reload()
+ .filterCompletedInstants().getInstants()
+
.map(HoodieInstant::getTimestamp).collect(Collectors.toList()).contains(instance),
+ "Pending compaction must be completed");
+ }
+
+ /**
+ * Test case for command 'compaction validate'.
+ */
+ @Test
+ public void testValidateCompaction() throws IOException {
+ // generate commits
+ generateCommits();
+
+ String instance = prepareScheduleCompaction();
+
+ CommandResult cr = getShell().executeCommand(
+ String.format("compaction validate --instant %s --sparkMaster %s",
instance, "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertTrue(
+ // compaction requested should be valid
+ cr.getResult().toString().contains("COMPACTION PLAN VALID")));
+ }
+
+ /**
+ * This function mainly tests the workflow of 'compaction unschedule'
command.
+ * The real test of {@link
org.apache.hudi.client.CompactionAdminClient#unscheduleCompactionPlan}
+ * is {@link TestCompactionAdminClient#testUnscheduleCompactionPlan()}.
+ */
+ @Test
+ public void testUnscheduleCompaction() throws Exception {
+ // generate commits
+ generateCommits();
+
+ String instance = prepareScheduleCompaction();
+
+ CommandResult cr = getShell().executeCommand(
+ String.format("compaction unschedule --instant %s --sparkMaster %s",
instance, "local"));
+
+ // Always has no file
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () -> assertEquals("No File renames needed to unschedule pending
compaction. Operation successful.",
+ cr.getResult().toString()));
+ }
+
+ /**
+ * This function mainly tests the workflow of 'compaction unscheduleFileId'
command.
+ * The real test of {@link
org.apache.hudi.client.CompactionAdminClient#unscheduleCompactionFileId}
+ * is {@link TestCompactionAdminClient#testUnscheduleCompactionFileId}.
+ */
+ @Test
+ public void testUnscheduleCompactFile() throws IOException {
+ int numEntriesPerInstant = 10;
+ CompactionTestUtils.setupAndValidateCompactionOperations(metaClient,
false, numEntriesPerInstant,
+ numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
+
+ CompactionOperation op = CompactionOperation.convertFromAvroRecordInstance(
+ CompactionUtils.getCompactionPlan(metaClient,
"001").getOperations().stream().findFirst().get());
+
+ CommandResult cr = getShell().executeCommand(
+ String.format("compaction unscheduleFileId --fileId %s --partitionPath
%s --sparkMaster %s",
+ op.getFileGroupId().getFileId(),
op.getFileGroupId().getPartitionPath(), "local"));
+
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () ->
assertTrue(removeNonWordAndStripSpace(cr.getResult().toString()).contains("true")),
+ () ->
assertFalse(removeNonWordAndStripSpace(cr.getResult().toString()).contains("false")));
+ }
+
+ /**
+ * This function mainly tests the workflow of 'compaction repair' command.
+ * The real test of {@link
org.apache.hudi.client.CompactionAdminClient#repairCompaction}
+ * is {@link TestCompactionAdminClient#testRepairCompactionPlan}.
+ */
+ @Test
+ public void testRepairCompaction() throws Exception {
+ int numEntriesPerInstant = 10;
+ String compactionInstant = "001";
+ CompactionTestUtils.setupAndValidateCompactionOperations(metaClient,
false, numEntriesPerInstant,
+ numEntriesPerInstant, numEntriesPerInstant, numEntriesPerInstant);
+
+ metaClient.reloadActiveTimeline();
+ CompactionAdminClient client = new CompactionAdminClient(new
HoodieSparkEngineContext(jsc), metaClient.getBasePath());
+ List<Pair<HoodieLogFile, HoodieLogFile>> renameFiles =
+ client.getRenamingActionsForUnschedulingCompactionPlan(metaClient,
compactionInstant, 1, Option.empty(), false);
+
+ renameFiles.forEach(lfPair -> {
+ try {
+ metaClient.getFs().rename(lfPair.getLeft().getPath(),
lfPair.getRight().getPath());
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ });
+
+ client.unscheduleCompactionPlan(compactionInstant, false, 1, false);
+
+ CommandResult cr = getShell().executeCommand(
+ String.format("compaction repair --instant %s --sparkMaster %s",
compactionInstant, "local"));
+
+ // All Executes is succeeded, result contains true and has no false
+ // Expected:
+ // ║ File Id │ Source File Path │ Destination File Path │ Rename Executed?
│ Rename Succeeded? │ Error ║
+ // ║ * │ * │ * │ true
│ true │ ║
+ assertAll("Command run failed",
+ () -> assertTrue(cr.isSuccess()),
+ () ->
assertTrue(removeNonWordAndStripSpace(cr.getResult().toString()).contains("true")),
+ () ->
assertFalse(removeNonWordAndStripSpace(cr.getResult().toString()).contains("false")));
+ }
+
+ private String prepareScheduleCompaction() {
+ // generate requested compaction
+ CommandResult cr = getShell().executeCommand(
+ String.format("compaction schedule --hoodieConfigs
hoodie.compact.inline.max.delta.commits=1 --sparkMaster %s",
+ "local"));
+ assertTrue(cr.isSuccess());
+
+ // get compaction instance
+ HoodieActiveTimeline timeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
+ Option<String> instance =
+
timeline.filterPendingCompactionTimeline().firstInstant().map(HoodieInstant::getTimestamp);
+ assertTrue(instance.isPresent(), "Must have pending compaction.");
+ return instance.get();
+ }
+
+ private void writeSchemaToTmpFile(String schemaPath) throws IOException {
+ try (BufferedWriter out = new BufferedWriter(new FileWriter(schemaPath))) {
+ out.write(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+ }
+ }
+
+ private void generateCommits() throws IOException {
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+
+ // Create the write client to write some records in
+ HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
+
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
+ .withDeleteParallelism(2).forTable(tableName)
+
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
+
+ SparkRDDWriteClient<HoodieAvroPayload> client = new
SparkRDDWriteClient<>(new HoodieSparkEngineContext(jsc), cfg);
+
+ List<HoodieRecord> records = insert(jsc, client, dataGen);
+ upsert(jsc, client, dataGen, records);
+ delete(jsc, client, records);
+ }
+
+ private List<HoodieRecord> insert(JavaSparkContext jsc,
SparkRDDWriteClient<HoodieAvroPayload> client,
+ HoodieTestDataGenerator dataGen) throws IOException {
+ // inserts
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ operateFunc(SparkRDDWriteClient::insert, client, writeRecords,
newCommitTime);
+ return records;
+ }
+
+ private void upsert(JavaSparkContext jsc,
SparkRDDWriteClient<HoodieAvroPayload> client,
+ HoodieTestDataGenerator dataGen, List<HoodieRecord> records)
+ throws IOException {
+ // updates
+ String newCommitTime = "002";
+ client.startCommitWithTime(newCommitTime);
+
+ List<HoodieRecord> toBeUpdated = dataGen.generateUpdates(newCommitTime, 2);
+ records.addAll(toBeUpdated);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
+ operateFunc(SparkRDDWriteClient::upsert, client, writeRecords,
newCommitTime);
+ }
+
+ private void delete(JavaSparkContext jsc,
SparkRDDWriteClient<HoodieAvroPayload> client,
+ List<HoodieRecord> records) {
+ // Delete
+ String newCommitTime = "003";
+ client.startCommitWithTime(newCommitTime);
+
+ // just delete half of the records
+ int numToDelete = records.size() / 2;
+ List<HoodieKey> toBeDeleted =
records.stream().map(HoodieRecord::getKey).limit(numToDelete).collect(Collectors.toList());
+ JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(toBeDeleted, 1);
+ client.delete(deleteRecords, newCommitTime);
+ }
+
+ private JavaRDD<WriteStatus> operateFunc(
+ HoodieClientTestBase.Function3<JavaRDD<WriteStatus>,
SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn,
+ SparkRDDWriteClient<HoodieAvroPayload> client, JavaRDD<HoodieRecord>
writeRecords, String commitTime)
+ throws IOException {
+ return writeFn.apply(client, writeRecords, commitTime);
+ }
+}
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java
index a7cf85c..67449dc 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/testutils/AbstractShellIntegrationTest.java
@@ -18,6 +18,7 @@
package org.apache.hudi.cli.testutils;
+import org.apache.hudi.common.model.HoodieTableType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -37,4 +38,8 @@ public abstract class AbstractShellIntegrationTest extends
AbstractShellBaseInte
public void teardown() throws Exception {
cleanupResources();
}
+
+ protected HoodieTableType getTableType() {
+ return HoodieTableType.MERGE_ON_READ;
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
index 9ace03a..1c869e4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java
@@ -383,7 +383,7 @@ public class CompactionAdminClient extends
AbstractHoodieClient {
* @return list of pairs of log-files (old, new) and for each pair, rename
must be done to successfully unschedule
* compaction.
*/
- protected List<Pair<HoodieLogFile, HoodieLogFile>>
getRenamingActionsForUnschedulingCompactionPlan(
+ public List<Pair<HoodieLogFile, HoodieLogFile>>
getRenamingActionsForUnschedulingCompactionPlan(
HoodieTableMetaClient metaClient, String compactionInstant, int
parallelism,
Option<HoodieTableFileSystemView> fsViewOpt, boolean skipValidation)
throws IOException {
HoodieTableFileSystemView fsView = fsViewOpt.isPresent() ? fsViewOpt.get()
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
index 5b18881..f0f1392 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiveLog.java
@@ -464,7 +464,8 @@ public class TestHoodieTimelineArchiveLog extends
HoodieClientTestHarness {
assertTrue(result);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2,
instant3);
- assertEquals(new HashSet<>(archivedInstants),
archivedTimeline.getInstants().collect(Collectors.toSet()));
+ assertEquals(new HashSet<>(archivedInstants),
+
archivedTimeline.filterCompletedInstants().getInstants().collect(Collectors.toSet()));
assertFalse(wrapperFs.exists(markerPath));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 9f8c439..6b05edd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -18,12 +18,14 @@
package org.apache.hudi.common.table.timeline;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
@@ -43,6 +45,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -66,6 +69,7 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX =
"commits";
private static final String ACTION_TYPE_KEY = "actionType";
+ private static final String ACTION_STATE = "actionState";
private HoodieTableMetaClient metaClient;
private Map<String, byte[]> readCommits = new HashMap<>();
@@ -108,6 +112,22 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
loadInstants(startTs, endTs);
}
+ public void loadCompactionDetailsInMemory(String compactionInstantTime) {
+ loadCompactionDetailsInMemory(compactionInstantTime,
compactionInstantTime);
+ }
+
+ public void loadCompactionDetailsInMemory(String startTs, String endTs) {
+ // load compactionPlan
+ loadInstants(new TimeRangeFilter(startTs, endTs), true, record ->
+
record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
+ &&
HoodieInstant.State.INFLIGHT.toString().equals(record.get(ACTION_STATE).toString())
+ );
+ }
+
+ public void clearInstantDetailsFromMemory(String instantTime) {
+ this.readCommits.remove(instantTime);
+ }
+
public void clearInstantDetailsFromMemory(String startTs, String endTs) {
this.findInstantsInRange(startTs, endTs).getInstants().forEach(instant ->
this.readCommits.remove(instant.getTimestamp()));
@@ -126,11 +146,16 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
final String instantTime =
record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
final String action = record.get(ACTION_TYPE_KEY).toString();
if (loadDetails) {
- Option.ofNullable(record.get(getMetadataKey(action))).map(actionData ->
- this.readCommits.put(instantTime,
actionData.toString().getBytes(StandardCharsets.UTF_8))
- );
+ Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> {
+ if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
+ this.readCommits.put(instantTime,
HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData));
+ } else {
+ this.readCommits.put(instantTime,
actionData.toString().getBytes(StandardCharsets.UTF_8));
+ }
+ return null;
+ });
}
- return new HoodieInstant(false, action, instantTime);
+ return new
HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()),
action, instantTime);
}
private String getMetadataKey(String action) {
@@ -145,6 +170,8 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
return "hoodieRollbackMetadata";
case HoodieTimeline.SAVEPOINT_ACTION:
return "hoodieSavePointMetadata";
+ case HoodieTimeline.COMPACTION_ACTION:
+ return "hoodieCompactionPlan";
default:
throw new HoodieIOException("Unknown action in metadata " + action);
}
@@ -158,12 +185,18 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
return loadInstants(new TimeRangeFilter(startTs, endTs), true);
}
+ private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean
loadInstantDetails) {
+ return loadInstants(filter, loadInstantDetails, record -> true);
+ }
+
/**
* This is method to read selected instants. Do NOT use this directly use
one of the helper methods above
* If loadInstantDetails is set to true, this would also update
'readCommits' map with commit details
* If filter is specified, only the filtered instants are loaded
+ * If commitsFilter is specified, only the filtered records are loaded
*/
- private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean
loadInstantDetails) {
+ private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean
loadInstantDetails,
+ Function<GenericRecord, Boolean> commitsFilter) {
try {
// list all files
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
@@ -187,6 +220,7 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
List<IndexedRecord> records = blk.getRecords();
// filter blocks in desired time window
Stream<HoodieInstant> instantsInBlkStream = records.stream()
+ .filter(r -> commitsFilter.apply((GenericRecord) r))
.map(r -> readCommit((GenericRecord) r,
loadInstantDetails));
if (filter != null) {
@@ -254,4 +288,13 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
return 0;
}
}
+
+ @Override
+ public HoodieDefaultTimeline getWriteTimeline() {
+ // filter in-memory instants
+ Set<String> validActions = CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, COMPACTION_ACTION, REPLACE_COMMIT_ACTION);
+ return new HoodieDefaultTimeline(getInstants().filter(i ->
+ readCommits.keySet().contains(i.getTimestamp()))
+ .filter(s -> validActions.contains(s.getAction())), details);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index 9b419ca..a50c299 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.timeline;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
@@ -33,12 +34,14 @@ import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
@@ -176,4 +179,13 @@ public class TimelineMetadataUtils {
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize
metadata of type " + clazz);
return fileReader.next();
}
+
+ public static <T extends SpecificRecordBase> T
deserializeAvroRecordMetadata(byte[] bytes, Schema schema)
+ throws IOException {
+ return deserializeAvroRecordMetadata(HoodieAvroUtils.bytesToAvro(bytes,
schema), schema);
+ }
+
+ public static <T extends SpecificRecordBase> T
deserializeAvroRecordMetadata(Object object, Schema schema) {
+ return (T) SpecificData.get().deepCopy(schema, object);
+ }
}