This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 20ed251 [HUDI-571] Add show archived compaction(s) to CLI
20ed251 is described below
commit 20ed2516d38b9ce4b3e185bd89b62264b8bd3f25
Author: Satish Kotha <[email protected]>
AuthorDate: Wed Jan 22 13:50:34 2020 -0800
[HUDI-571] Add show archived compaction(s) to CLI
---
.../apache/hudi/cli/commands/CommitsCommand.java | 13 +-
.../hudi/cli/commands/CompactionCommand.java | 247 ++++++++++++++++-----
.../java/org/apache/hudi/cli/utils/CommitUtil.java | 23 ++
3 files changed, 216 insertions(+), 67 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 1e17c4c..804096b 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -21,6 +21,7 @@ package org.apache.hudi.cli.commands;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -41,10 +42,8 @@ import org.springframework.shell.core.annotation.CliOption;
import org.springframework.stereotype.Component;
import java.io.IOException;
-import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -186,10 +185,10 @@ public class CommitsCommand implements CommandMarker {
final boolean headerOnly)
throws IOException {
if (StringUtils.isNullOrEmpty(startTs)) {
- startTs = getTimeDaysAgo(10);
+ startTs = CommitUtil.getTimeDaysAgo(10);
}
if (StringUtils.isNullOrEmpty(endTs)) {
- endTs = getTimeDaysAgo(1);
+ endTs = CommitUtil.getTimeDaysAgo(1);
}
HoodieArchivedTimeline archivedTimeline =
HoodieCLI.getTableMetaClient().getArchivedTimeline();
try {
@@ -362,10 +361,4 @@ public class CommitsCommand implements CommandMarker {
return "Load sync state between " +
HoodieCLI.getTableMetaClient().getTableConfig().getTableName() + " and "
+ HoodieCLI.syncTableMetadata.getTableConfig().getTableName();
}
-
- private String getTimeDaysAgo(int numberOfDays) {
- Date date =
Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
- return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
- }
-
}
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
index 0b57947..2564931 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CompactionCommand.java
@@ -26,16 +26,20 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.commands.SparkMain.SparkCommand;
+import org.apache.hudi.cli.utils.CommitUtil;
import org.apache.hudi.cli.utils.InputStreamConsumer;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.AvroUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.func.OperationResult;
@@ -61,8 +65,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* CLI command to display compaction related options.
@@ -95,51 +101,9 @@ public class CompactionCommand implements CommandMarker {
throws IOException {
HoodieTableMetaClient client = checkAndGetMetaClient();
HoodieActiveTimeline activeTimeline = client.getActiveTimeline();
- HoodieTimeline timeline = activeTimeline.getCommitsAndCompactionTimeline();
- HoodieTimeline commitTimeline =
activeTimeline.getCommitTimeline().filterCompletedInstants();
- Set<String> committed =
commitTimeline.getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
-
- List<HoodieInstant> instants =
timeline.getReverseOrderedInstants().collect(Collectors.toList());
- List<Comparable[]> rows = new ArrayList<>();
- for (HoodieInstant instant : instants) {
- HoodieCompactionPlan compactionPlan = null;
- if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
- try {
- // This could be a completed compaction. Assume a compaction request
file is present but skip if fails
- compactionPlan = AvroUtils.deserializeCompactionPlan(
- activeTimeline.readCompactionPlanAsBytes(
-
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
- } catch (HoodieIOException ioe) {
- // SKIP
- }
- } else {
- compactionPlan =
AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
-
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
- }
-
- if (null != compactionPlan) {
- State state = instant.getState();
- if (committed.contains(instant.getTimestamp())) {
- state = State.COMPLETED;
- }
- if (includeExtraMetadata) {
- rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
- compactionPlan.getOperations() == null ? 0 :
compactionPlan.getOperations().size(),
- compactionPlan.getExtraMetadata().toString()});
- } else {
- rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
- compactionPlan.getOperations() == null ? 0 :
compactionPlan.getOperations().size()});
- }
- }
- }
-
- Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
- TableHeader header = new TableHeader().addTableHeaderField("Compaction
Instant Time").addTableHeaderField("State")
- .addTableHeaderField("Total FileIds to be Compacted");
- if (includeExtraMetadata) {
- header = header.addTableHeaderField("Extra Metadata");
- }
- return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
+ return printAllCompactions(activeTimeline,
+ compactionPlanReader(this::readCompactionPlanForActiveTimeline,
activeTimeline),
+ includeExtraMetadata, sortByField, descending, limit, headerOnly);
}
@CliCommand(value = "compaction show", help = "Shows compaction details for
a specific compaction instant")
@@ -159,19 +123,68 @@ public class CompactionCommand implements CommandMarker {
activeTimeline.readCompactionPlanAsBytes(
HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime)).get());
- List<Comparable[]> rows = new ArrayList<>();
- if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
- for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
- rows.add(new Comparable[] {op.getPartitionPath(), op.getFileId(),
op.getBaseInstantTime(), op.getDataFilePath(),
- op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" :
op.getMetrics().toString()});
- }
+ return printCompaction(compactionPlan, sortByField, descending, limit,
headerOnly);
+ }
+
+ @CliCommand(value = "compactions show archived", help = "Shows compaction
details for specified time window")
+ public String compactionShowArchived(
+ @CliOption(key = {"includeExtraMetadata"}, help = "Include extra
metadata",
+ unspecifiedDefaultValue = "false") final boolean
includeExtraMetadata,
+ @CliOption(key = {"startTs"}, mandatory = false, help = "start time
for compactions, default: now - 10 days")
+ String startTs,
+ @CliOption(key = {"endTs"}, mandatory = false, help = "end time for
compactions, default: now - 1 day")
+ String endTs,
+ @CliOption(key = {"limit"}, help = "Limit compactions",
+ unspecifiedDefaultValue = "-1") final Integer limit,
+ @CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
+ @CliOption(key = {"desc"}, help = "Ordering",
unspecifiedDefaultValue = "false") final boolean descending,
+ @CliOption(key = {"headeronly"}, help = "Print Header Only",
+ unspecifiedDefaultValue = "false") final boolean headerOnly)
+ throws Exception {
+ if (StringUtils.isNullOrEmpty(startTs)) {
+ startTs = CommitUtil.getTimeDaysAgo(10);
+ }
+ if (StringUtils.isNullOrEmpty(endTs)) {
+ endTs = CommitUtil.getTimeDaysAgo(1);
}
- Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
- TableHeader header = new TableHeader().addTableHeaderField("Partition
Path").addTableHeaderField("File Id")
- .addTableHeaderField("Base Instant").addTableHeaderField("Data File
Path")
- .addTableHeaderField("Total Delta
Files").addTableHeaderField("getMetrics");
- return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
+ HoodieTableMetaClient client = checkAndGetMetaClient();
+ HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
+ archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
+ try {
+ return printAllCompactions(archivedTimeline,
+
compactionPlanReader(this::readCompactionPlanForArchivedTimeline,
archivedTimeline),
+ includeExtraMetadata, sortByField, descending, limit,
headerOnly);
+ } finally {
+ archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
+ }
+ }
+
+ @CliCommand(value = "compaction show archived", help = "Shows compaction
details for a specific compaction instant")
+ public String compactionShowArchived(
+ @CliOption(key = "instant", mandatory = true,
+ help = "instant time") final String compactionInstantTime,
+ @CliOption(key = {"limit"}, help = "Limit commits",
+ unspecifiedDefaultValue = "-1") final Integer limit,
+ @CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
+ @CliOption(key = {"desc"}, help = "Ordering",
unspecifiedDefaultValue = "false") final boolean descending,
+ @CliOption(key = {"headeronly"}, help = "Print Header Only",
+ unspecifiedDefaultValue = "false") final boolean headerOnly)
+ throws Exception {
+ HoodieTableMetaClient client = checkAndGetMetaClient();
+ HoodieArchivedTimeline archivedTimeline = client.getArchivedTimeline();
+ HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
+ HoodieTimeline.COMPACTION_ACTION, compactionInstantTime);
+ String startTs = CommitUtil.addHours(compactionInstantTime, -1);
+ String endTs = CommitUtil.addHours(compactionInstantTime, 1);
+ try {
+ archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
+ HoodieCompactionPlan compactionPlan =
AvroUtils.deserializeCompactionPlan(
+ archivedTimeline.getInstantDetails(instant).get());
+ return printCompaction(compactionPlan, sortByField, descending, limit,
headerOnly);
+ } finally {
+ archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs);
+ }
}
@CliCommand(value = "compaction schedule", help = "Schedule Compaction")
@@ -249,6 +262,126 @@ public class CompactionCommand implements CommandMarker {
return "Compaction successfully completed for " + compactionInstantTime;
}
+ /**
+ * Prints all compaction details.
+ */
+ private String printAllCompactions(HoodieDefaultTimeline timeline,
+ Function<HoodieInstant,
HoodieCompactionPlan> compactionPlanReader,
+ boolean includeExtraMetadata,
+ String sortByField,
+ boolean descending,
+ int limit,
+ boolean headerOnly) {
+
+ Stream<HoodieInstant> instantsStream =
timeline.getCommitsAndCompactionTimeline().getReverseOrderedInstants();
+ List<Pair<HoodieInstant, HoodieCompactionPlan>> compactionPlans =
instantsStream
+ .map(instant -> Pair.of(instant,
compactionPlanReader.apply(instant)))
+ .filter(pair -> pair.getRight() != null)
+ .collect(Collectors.toList());
+
+ Set<HoodieInstant> committedInstants =
timeline.getCommitTimeline().filterCompletedInstants()
+ .getInstants().collect(Collectors.toSet());
+
+ List<Comparable[]> rows = new ArrayList<>();
+ for (Pair<HoodieInstant, HoodieCompactionPlan> compactionPlan :
compactionPlans) {
+ HoodieCompactionPlan plan = compactionPlan.getRight();
+ HoodieInstant instant = compactionPlan.getLeft();
+ final HoodieInstant.State state;
+ if (committedInstants.contains(instant)) {
+ state = HoodieInstant.State.COMPLETED;
+ } else {
+ state = instant.getState();
+ }
+
+ if (includeExtraMetadata) {
+ rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
+ plan.getOperations() == null ? 0 : plan.getOperations().size(),
+ plan.getExtraMetadata().toString()});
+ } else {
+ rows.add(new Comparable[] {instant.getTimestamp(), state.toString(),
+ plan.getOperations() == null ? 0 :
plan.getOperations().size()});
+ }
+ }
+
+ Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
+ TableHeader header = new TableHeader().addTableHeaderField("Compaction
Instant Time").addTableHeaderField("State")
+ .addTableHeaderField("Total FileIds to be Compacted");
+ if (includeExtraMetadata) {
+ header = header.addTableHeaderField("Extra Metadata");
+ }
+ return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
+ }
+
+ /**
+ * Compaction reading is different for different timelines. Create partial
function to override special logic.
+ * We can make these read methods part of HoodieDefaultTimeline and override
where necessary. But the
+ * BiFunction below has 'hacky' exception blocks, so restricting it to CLI.
+ */
+ private <T extends HoodieDefaultTimeline, U extends HoodieInstant, V extends
HoodieCompactionPlan>
+ Function<HoodieInstant, HoodieCompactionPlan> compactionPlanReader(
+ BiFunction<T, HoodieInstant, HoodieCompactionPlan> f, T timeline) {
+
+ return (y) -> f.apply(timeline, y);
+ }
+
+ private HoodieCompactionPlan
readCompactionPlanForArchivedTimeline(HoodieArchivedTimeline archivedTimeline,
+
HoodieInstant instant) {
+ if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
+ return null;
+ } else {
+ try {
+ return
AvroUtils.deserializeCompactionPlan(archivedTimeline.getInstantDetails(instant).get());
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * TBD Can we make this part of HoodieActiveTimeline or a utility class.
+ */
+ private HoodieCompactionPlan
readCompactionPlanForActiveTimeline(HoodieActiveTimeline activeTimeline,
+
HoodieInstant instant) {
+ try {
+ if (!HoodieTimeline.COMPACTION_ACTION.equals(instant.getAction())) {
+ try {
+ // This could be a completed compaction. Assume a compaction request
file is present but skip if fails
+ return AvroUtils.deserializeCompactionPlan(
+ activeTimeline.readCompactionPlanAsBytes(
+
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
+ } catch (HoodieIOException ioe) {
+ // SKIP
+ return null;
+ }
+ } else {
+ return
AvroUtils.deserializeCompactionPlan(activeTimeline.readCompactionPlanAsBytes(
+
HoodieTimeline.getCompactionRequestedInstant(instant.getTimestamp())).get());
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ }
+
+ private String printCompaction(HoodieCompactionPlan compactionPlan,
+ String sortByField,
+ boolean descending,
+ int limit,
+ boolean headerOnly) {
+ List<Comparable[]> rows = new ArrayList<>();
+ if ((null != compactionPlan) && (null != compactionPlan.getOperations())) {
+ for (HoodieCompactionOperation op : compactionPlan.getOperations()) {
+ rows.add(new Comparable[]{op.getPartitionPath(), op.getFileId(),
op.getBaseInstantTime(), op.getDataFilePath(),
+ op.getDeltaFilePaths().size(), op.getMetrics() == null ? "" :
op.getMetrics().toString()});
+ }
+ }
+
+ Map<String, Function<Object, String>> fieldNameToConverterMap = new
HashMap<>();
+ TableHeader header = new TableHeader().addTableHeaderField("Partition
Path").addTableHeaderField("File Id")
+ .addTableHeaderField("Base Instant").addTableHeaderField("Data
File Path")
+ .addTableHeaderField("Total Delta
Files").addTableHeaderField("getMetrics");
+ return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
+ }
+
private static String getTmpSerializerFile() {
return TMP_DIR + UUID.randomUUID().toString() + ".ser";
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
index aaaeb35..60d3e3e 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java
@@ -21,9 +21,15 @@ package org.apache.hudi.cli.utils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import java.io.IOException;
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Date;
import java.util.List;
/**
@@ -42,4 +48,21 @@ public class CommitUtil {
}
return totalNew;
}
+
+ public static String getTimeDaysAgo(int numberOfDays) {
+ Date date =
Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
+ return HoodieActiveTimeline.COMMIT_FORMATTER.format(date);
+ }
+
+ /**
+ * Add hours to specified time. If hours <0, this acts as remove hours.
+ * example, say compactionCommitTime: "20200202020000"
+ * a) hours: +1, returns 20200202030000
+ * b) hours: -1, returns 20200202010000
+ */
+ public static String addHours(String compactionCommitTime, int hours) throws
ParseException {
+ Instant instant =
HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).toInstant();
+ ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant,
ZoneId.systemDefault());
+ return
HoodieActiveTimeline.COMMIT_FORMATTER.format(Date.from(commitDateTime.plusHours(hours).toInstant()));
+ }
}