This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a6740d0767 [HUDI-3579] Add timeline commands in hudi-cli (#5139)
a6740d0767 is described below
commit a6740d0767f7b7296b05d5762071a3ba71abc9b8
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Aug 18 17:13:32 2022 -0700
[HUDI-3579] Add timeline commands in hudi-cli (#5139)
---
.../org/apache/hudi/cli/HoodiePrintHelper.java | 73 +++-
.../apache/hudi/cli/HoodieTableHeaderFields.java | 15 +
.../src/main/java/org/apache/hudi/cli/Table.java | 53 ++-
.../main/java/org/apache/hudi/cli/TableHeader.java | 10 +
.../apache/hudi/cli/commands/TimelineCommand.java | 410 +++++++++++++++++++++
5 files changed, 536 insertions(+), 25 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
index be640376ee..0ffec2cac0 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
@@ -58,30 +58,76 @@ public class HoodiePrintHelper {
* @param rows List of rows
* @return Serialized form for printing
*/
- public static String print(TableHeader rowHeader, Map<String,
Function<Object, String>> fieldNameToConverterMap,
+ public static String print(
+ TableHeader rowHeader, Map<String, Function<Object, String>>
fieldNameToConverterMap,
String sortByField, boolean isDescending, Integer limit, boolean
headerOnly, List<Comparable[]> rows) {
- return print(rowHeader, fieldNameToConverterMap, sortByField,
isDescending, limit, headerOnly, rows, "");
+ return print(rowHeader, fieldNameToConverterMap, false, sortByField,
isDescending, limit, headerOnly, rows);
+ }
+
+ /**
+ * Serialize Table to printable string.
+ *
+ * @param rowHeader Row Header
+ * @param fieldNameToConverterMap Field Specific Converters
+ * @param withRowNo Whether to add row number
+ * @param sortByField Sorting field
+ * @param isDescending Order
+ * @param limit Limit
+ * @param headerOnly Headers only
+ * @param rows List of rows
+ * @return Serialized form for printing
+ */
+ public static String print(
+ TableHeader rowHeader, Map<String, Function<Object, String>>
fieldNameToConverterMap, boolean withRowNo,
+ String sortByField, boolean isDescending, Integer limit, boolean
headerOnly, List<Comparable[]> rows) {
+ return print(rowHeader, fieldNameToConverterMap, withRowNo, sortByField,
isDescending, limit, headerOnly, rows, "");
}
/**
* Serialize Table to printable string and also export a temporary view to
easily write sql queries.
+ * <p>
+ * Ideally, exporting view needs to be outside PrintHelper, but all commands
use this. So this is easy
+ * way to add support for all commands
*
+ * @param rowHeader Row Header
+ * @param fieldNameToConverterMap Field Specific Converters
+ * @param sortByField Sorting field
+ * @param isDescending Order
+ * @param limit Limit
+ * @param headerOnly Headers only
+ * @param rows List of rows
+ * @param tempTableName table name to export
+ * @return Serialized form for printing
+ */
+ public static String print(
+ TableHeader rowHeader, Map<String, Function<Object, String>>
fieldNameToConverterMap,
+ String sortByField, boolean isDescending, Integer limit, boolean
headerOnly,
+ List<Comparable[]> rows, String tempTableName) {
+ return print(rowHeader, fieldNameToConverterMap, false, sortByField,
isDescending, limit,
+ headerOnly, rows, tempTableName);
+ }
+
+ /**
+ * Serialize Table to printable string and also export a temporary view to
easily write sql queries.
+ * <p>
* Ideally, exporting view needs to be outside PrintHelper, but all commands
use this. So this is easy
* way to add support for all commands
*
- * @param rowHeader Row Header
+ * @param rowHeader Row Header
* @param fieldNameToConverterMap Field Specific Converters
- * @param sortByField Sorting field
- * @param isDescending Order
- * @param limit Limit
- * @param headerOnly Headers only
- * @param rows List of rows
- * @param tempTableName table name to export
+ * @param withRowNo Whether to add row number
+ * @param sortByField Sorting field
+ * @param isDescending Order
+ * @param limit Limit
+ * @param headerOnly Headers only
+ * @param rows List of rows
+ * @param tempTableName table name to export
* @return Serialized form for printing
*/
- public static String print(TableHeader rowHeader, Map<String,
Function<Object, String>> fieldNameToConverterMap,
- String sortByField, boolean isDescending, Integer limit, boolean
headerOnly, List<Comparable[]> rows,
- String tempTableName) {
+ public static String print(
+ TableHeader rowHeader, Map<String, Function<Object, String>>
fieldNameToConverterMap,
+ boolean withRowNo, String sortByField, boolean isDescending, Integer
limit, boolean headerOnly,
+ List<Comparable[]> rows, String tempTableName) {
if (headerOnly) {
return HoodiePrintHelper.print(rowHeader);
@@ -97,7 +143,8 @@ public class HoodiePrintHelper {
}
Table table =
- new Table(rowHeader, fieldNameToConverterMap,
Option.ofNullable(sortByField.isEmpty() ? null : sortByField),
+ new Table(rowHeader, fieldNameToConverterMap, withRowNo,
+ Option.ofNullable(sortByField.isEmpty() ? null : sortByField),
Option.ofNullable(isDescending), Option.ofNullable(limit <= 0 ?
null : limit)).addAllRows(rows).flip();
return HoodiePrintHelper.print(table);
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index a4a8e46dfd..3bea0f2293 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -22,6 +22,7 @@ package org.apache.hudi.cli;
* Fields of print table header.
*/
public class HoodieTableHeaderFields {
+ public static final String HEADER_ROW_NO = "No.";
public static final String HEADER_PARTITION = "Partition";
public static final String HEADER_INSTANT = "Instant";
public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + "
Path";
@@ -166,4 +167,18 @@ public class HoodieTableHeaderFields {
public static final String HEADER_DESTINATION_FILE_PATH = "Destination " +
HEADER_FILE_PATH;
public static final String HEADER_RENAME_EXECUTED = "Rename Executed?";
public static final String HEADER_RENAME_SUCCEEDED = "Rename Succeeded?";
+
+ /**
+ * Fields of timeline command output
+ */
+ public static final String HEADER_REQUESTED_TIME = "Requested\nTime";
+ public static final String HEADER_INFLIGHT_TIME = "Inflight\nTime";
+ public static final String HEADER_COMPLETED_TIME = "Completed\nTime";
+ public static final String HEADER_ROLLBACK_INFO = "Rollback Info";
+ public static final String HEADER_MT_PREFIX = "MT\n";
+ public static final String HEADER_MT_ACTION = HEADER_MT_PREFIX +
HEADER_ACTION;
+ public static final String HEADER_MT_STATE = HEADER_MT_PREFIX + HEADER_STATE;
+ public static final String HEADER_MT_REQUESTED_TIME = HEADER_MT_PREFIX +
HEADER_REQUESTED_TIME;
+ public static final String HEADER_MT_INFLIGHT_TIME = HEADER_MT_PREFIX +
HEADER_INFLIGHT_TIME;
+ public static final String HEADER_MT_COMPLETED_TIME = HEADER_MT_PREFIX +
HEADER_COMPLETED_TIME;
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
index 8158eef8d5..70e8a97403 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
@@ -37,6 +37,8 @@ public class Table implements Iterable<List<String>> {
// Header for this table
private final TableHeader rowHeader;
+ // Whether to print row number
+ private final boolean addRowNo;
// User-specified conversions before rendering
private final Map<String, Function<Object, String>> fieldNameToConverterMap;
// Option attribute to track sorting field
@@ -49,12 +51,17 @@ public class Table implements Iterable<List<String>> {
private final List<List<Comparable>> rawRows;
// Flag to determine if all the rows have been added
private boolean finishedAdding = false;
- // Rows ready for Rendering
+ // Headers ready for rendering
+ private TableHeader renderHeaders;
+ // Rows ready for rendering
private List<List<String>> renderRows;
- public Table(TableHeader rowHeader, Map<String, Function<Object, String>>
fieldNameToConverterMap,
- Option<String> orderingFieldNameOptional, Option<Boolean>
isDescendingOptional, Option<Integer> limitOptional) {
+ public Table(
+ TableHeader rowHeader, Map<String, Function<Object, String>>
fieldNameToConverterMap,
+ boolean addRowNo, Option<String> orderingFieldNameOptional,
+ Option<Boolean> isDescendingOptional, Option<Integer> limitOptional) {
this.rowHeader = rowHeader;
+ this.addRowNo = addRowNo;
this.fieldNameToConverterMap = fieldNameToConverterMap;
this.orderingFieldNameOptional = orderingFieldNameOptional;
this.isDescendingOptional = isDescendingOptional;
@@ -64,7 +71,7 @@ public class Table implements Iterable<List<String>> {
/**
* Main API to add row to the table.
- *
+ *
* @param row Row
*/
public Table add(List<Comparable> row) {
@@ -134,15 +141,34 @@ public class Table implements Iterable<List<String>> {
private void sortAndLimit() {
this.renderRows = new ArrayList<>();
final int limit = this.limitOptional.orElse(rawRows.size());
- final List<List<Comparable>> orderedRows = orderRows();
- renderRows = orderedRows.stream().limit(limit).map(row ->
IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
- String fieldName = rowHeader.get(idx);
- if (fieldNameToConverterMap.containsKey(fieldName)) {
- return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
+ // Row number is added here if enabled
+ final List<List<Comparable>> rawOrderedRows = orderRows();
+ final List<List<Comparable>> orderedRows;
+ if (addRowNo) {
+ orderedRows = new ArrayList<>();
+ int rowNo = 0;
+ for (List<Comparable> row : rawOrderedRows) {
+ List<Comparable> newRow = new ArrayList<>();
+ newRow.add(rowNo++);
+ newRow.addAll(row);
+ orderedRows.add(newRow);
}
- Object v = row.get(idx);
- return v == null ? "null" : v.toString();
- }).collect(Collectors.toList())).collect(Collectors.toList());
+ } else {
+ orderedRows = rawOrderedRows;
+ }
+ renderHeaders = addRowNo
+ ? new
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ROW_NO)
+ .addTableHeaderFields(rowHeader)
+ : rowHeader;
+ renderRows = orderedRows.stream().limit(limit)
+ .map(row -> IntStream.range(0,
renderHeaders.getNumFields()).mapToObj(idx -> {
+ String fieldName = renderHeaders.get(idx);
+ if (fieldNameToConverterMap.containsKey(fieldName)) {
+ return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
+ }
+ Object v = row.get(idx);
+ return v == null ? "null" : v.toString();
+ }).collect(Collectors.toList())).collect(Collectors.toList());
}
@Override
@@ -162,6 +188,9 @@ public class Table implements Iterable<List<String>> {
}
public List<String> getFieldNames() {
+ if (renderHeaders != null) {
+ return renderHeaders.getFieldNames();
+ }
return rowHeader.getFieldNames();
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java
index 8ec392d1ab..ee17480a30 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java
@@ -39,6 +39,16 @@ public class TableHeader {
return this;
}
+ /**
+ * Add fields from another {@link TableHeader} instance.
+ *
+ * @param tableHeader {@link TableHeader} instance.
+ */
+ public TableHeader addTableHeaderFields(TableHeader tableHeader) {
+ fieldNames.addAll(tableHeader.getFieldNames());
+ return this;
+ }
+
/**
* Get all field names.
*/
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
new file mode 100644
index 0000000000..9af04d155b
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * CLI command to display timeline options.
+ */
+@Component
+public class TimelineCommand implements CommandMarker {
+
+ private static final Logger LOG =
LogManager.getLogger(TimelineCommand.class);
+ private static final SimpleDateFormat DATE_FORMAT_DEFAULT = new
SimpleDateFormat("MM-dd HH:mm");
+ private static final SimpleDateFormat DATE_FORMAT_SECONDS = new
SimpleDateFormat("MM-dd HH:mm:ss");
+
+ @CliCommand(value = "timeline show active", help = "List all instants in
active timeline")
+ public String showActive(
+ @CliOption(key = {"limit"}, help = "Limit #rows to be displayed",
unspecifiedDefaultValue = "10") Integer limit,
+ @CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
+ @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue =
"false") final boolean descending,
+ @CliOption(key = {"headeronly"}, help = "Print Header Only",
+ unspecifiedDefaultValue = "false") final boolean headerOnly,
+ @CliOption(key = {"with-metadata-table"}, help = "Show metadata table
timeline together with data table",
+ unspecifiedDefaultValue = "false") final boolean withMetadataTable,
+ @CliOption(key = {"show-rollback-info"}, help = "Show instant to
rollback for rollbacks",
+ unspecifiedDefaultValue = "false") final boolean showRollbackInfo,
+ @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant
file modification time",
+ unspecifiedDefaultValue = "false") final boolean showTimeSeconds) {
+ HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+ try {
+ if (withMetadataTable) {
+ HoodieTableMetaClient mtMetaClient =
getMetadataTableMetaClient(metaClient);
+ return printTimelineInfoWithMetadataTable(
+ metaClient.getActiveTimeline(), mtMetaClient.getActiveTimeline(),
+ getInstantInfoFromTimeline(metaClient.getFs(),
metaClient.getMetaPath()),
+ getInstantInfoFromTimeline(mtMetaClient.getFs(),
mtMetaClient.getMetaPath()),
+ limit, sortByField, descending, headerOnly, true, showTimeSeconds,
showRollbackInfo);
+ }
+ return printTimelineInfo(
+ metaClient.getActiveTimeline(),
+ getInstantInfoFromTimeline(metaClient.getFs(),
metaClient.getMetaPath()),
+ limit, sortByField, descending, headerOnly, true, showTimeSeconds,
showRollbackInfo);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return e.getMessage();
+ }
+ }
+
+ @CliCommand(value = "timeline show incomplete", help = "List all incomplete
instants in active timeline")
+ public String showIncomplete(
+ @CliOption(key = {"limit"}, help = "Limit #rows to be displayed",
unspecifiedDefaultValue = "10") Integer limit,
+ @CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
+ @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue =
"false") final boolean descending,
+ @CliOption(key = {"headeronly"}, help = "Print Header Only",
+ unspecifiedDefaultValue = "false") final boolean headerOnly,
+ @CliOption(key = {"show-rollback-info"}, help = "Show instant to
rollback for rollbacks",
+ unspecifiedDefaultValue = "false") final boolean showRollbackInfo,
+ @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant
file modification time",
+ unspecifiedDefaultValue = "false") final boolean showTimeSeconds) {
+ HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+ try {
+ return printTimelineInfo(
+ metaClient.getActiveTimeline().filterInflightsAndRequested(),
+ getInstantInfoFromTimeline(metaClient.getFs(),
metaClient.getMetaPath()),
+ limit, sortByField, descending, headerOnly, true, showTimeSeconds,
showRollbackInfo);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return e.getMessage();
+ }
+ }
+
+ @CliCommand(value = "metadata timeline show active",
+ help = "List all instants in active timeline of metadata table")
+ public String metadataShowActive(
+ @CliOption(key = {"limit"}, help = "Limit #rows to be displayed",
unspecifiedDefaultValue = "10") Integer limit,
+ @CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
+ @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue =
"false") final boolean descending,
+ @CliOption(key = {"headeronly"}, help = "Print Header Only",
+ unspecifiedDefaultValue = "false") final boolean headerOnly,
+ @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant
file modification time",
+ unspecifiedDefaultValue = "false") final boolean showTimeSeconds) {
+ HoodieTableMetaClient metaClient =
getMetadataTableMetaClient(HoodieCLI.getTableMetaClient());
+ try {
+ return printTimelineInfo(
+ metaClient.getActiveTimeline(),
+ getInstantInfoFromTimeline(metaClient.getFs(),
metaClient.getMetaPath()),
+ limit, sortByField, descending, headerOnly, true, showTimeSeconds,
false);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return e.getMessage();
+ }
+ }
+
+ @CliCommand(value = "metadata timeline show incomplete",
+ help = "List all incomplete instants in active timeline of metadata
table")
+ public String metadataShowIncomplete(
+ @CliOption(key = {"limit"}, help = "Limit #rows to be displayed",
unspecifiedDefaultValue = "10") Integer limit,
+ @CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
+ @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue =
"false") final boolean descending,
+ @CliOption(key = {"headeronly"}, help = "Print Header Only",
+ unspecifiedDefaultValue = "false") final boolean headerOnly,
+ @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant
file modification time",
+ unspecifiedDefaultValue = "false") final boolean showTimeSeconds) {
+ HoodieTableMetaClient metaClient =
getMetadataTableMetaClient(HoodieCLI.getTableMetaClient());
+ try {
+ return printTimelineInfo(
+ metaClient.getActiveTimeline().filterInflightsAndRequested(),
+ getInstantInfoFromTimeline(metaClient.getFs(),
metaClient.getMetaPath()),
+ limit, sortByField, descending, headerOnly, true, showTimeSeconds,
false);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return e.getMessage();
+ }
+ }
+
+ private HoodieTableMetaClient
getMetadataTableMetaClient(HoodieTableMetaClient metaClient) {
+ return HoodieTableMetaClient.builder().setConf(HoodieCLI.conf)
+
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))
+ .setLoadActiveTimelineOnLoad(false)
+ .setConsistencyGuardConfig(HoodieCLI.consistencyGuardConfig)
+ .build();
+ }
+
+ private Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>>
getInstantInfoFromTimeline(
+ FileSystem fs, String metaPath) throws IOException {
+ Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> instantMap
= new HashMap<>();
+ Stream<HoodieInstantWithModTime> instantStream = Arrays.stream(
+ HoodieTableMetaClient.scanFiles(fs, new Path(metaPath), path -> {
+ // Include only the meta files with extensions that needs to be
included
+ String extension =
HoodieInstant.getTimelineFileExtension(path.getName());
+ return
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE.contains(extension);
+ })).map(HoodieInstantWithModTime::new);
+ instantStream.forEach(instant -> {
+ instantMap.computeIfAbsent(instant.getTimestamp(), t -> new HashMap<>())
+ .put(instant.getState(), instant);
+ });
+ return instantMap;
+ }
+
+ private String getFormattedDate(
+ String instantTimestamp, HoodieInstant.State state,
+ Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>>
instantInfoMap,
+ boolean showTimeSeconds) {
+ Long timeMs = null;
+ Map<HoodieInstant.State, HoodieInstantWithModTime> mapping =
instantInfoMap.get(instantTimestamp);
+ if (mapping != null && mapping.containsKey(state)) {
+ timeMs = mapping.get(state).getModificationTime();
+ }
+ SimpleDateFormat sdf = showTimeSeconds ? DATE_FORMAT_SECONDS :
DATE_FORMAT_DEFAULT;
+ return timeMs != null ? sdf.format(new Date(timeMs)) : "-";
+ }
+
+ private String printTimelineInfo(
+ HoodieTimeline timeline,
+ Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>>
instantInfoMap,
+ Integer limit, String sortByField, boolean descending, boolean
headerOnly, boolean withRowNo,
+ boolean showTimeSeconds, boolean showRollbackInfo) {
+ Map<String, List<String>> rollbackInfo =
getRolledBackInstantInfo(timeline);
+ final List<Comparable[]> rows = timeline.getInstants().map(instant -> {
+ int numColumns = showRollbackInfo ? 7 : 6;
+ Comparable[] row = new Comparable[numColumns];
+ String instantTimestamp = instant.getTimestamp();
+ row[0] = instantTimestamp;
+ row[1] = instant.getAction();
+ row[2] = instant.getState();
+ if (showRollbackInfo) {
+ if
(HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction())) {
+ row[3] = "Rolls back\n" + getInstantToRollback(timeline, instant);
+ } else {
+ if (rollbackInfo.containsKey(instantTimestamp)) {
+ row[3] = "Rolled back by\n" + String.join(",\n",
rollbackInfo.get(instantTimestamp));
+ } else {
+ row[3] = "-";
+ }
+ }
+ }
+ row[numColumns - 3] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.REQUESTED, instantInfoMap,
showTimeSeconds);
+ row[numColumns - 2] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.INFLIGHT, instantInfoMap,
showTimeSeconds);
+ row[numColumns - 1] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.COMPLETED, instantInfoMap,
showTimeSeconds);
+ return row;
+ }).collect(Collectors.toList());
+ TableHeader header = new TableHeader()
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE);
+ if (showRollbackInfo) {
+ header.addTableHeaderField(HoodieTableHeaderFields.HEADER_ROLLBACK_INFO);
+ }
+ header.addTableHeaderField(HoodieTableHeaderFields.HEADER_REQUESTED_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_INFLIGHT_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPLETED_TIME);
+ return HoodiePrintHelper.print(
+ header, new HashMap<>(), withRowNo, sortByField, descending, limit,
headerOnly, rows);
+ }
+
+ private String printTimelineInfoWithMetadataTable(
+ HoodieTimeline dtTimeline, HoodieTimeline mtTimeline,
+ Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>>
dtInstantInfoMap,
+ Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>>
mtInstantInfoMap,
+ Integer limit, String sortByField, boolean descending, boolean
headerOnly, boolean withRowNo,
+ boolean showTimeSeconds, boolean showRollbackInfo) {
+ Set<String> instantTimeSet = new HashSet(dtInstantInfoMap.keySet());
+ instantTimeSet.addAll(mtInstantInfoMap.keySet());
+ List<String> instantTimeList = instantTimeSet.stream()
+ .sorted(new
HoodieInstantTimeComparator()).collect(Collectors.toList());
+ Map<String, List<String>> dtRollbackInfo =
getRolledBackInstantInfo(dtTimeline);
+
+ final List<Comparable[]> rows =
instantTimeList.stream().map(instantTimestamp -> {
+ int numColumns = showRollbackInfo ? 12 : 11;
+ Option<HoodieInstant> dtInstant = getInstant(dtTimeline,
instantTimestamp);
+ Option<HoodieInstant> mtInstant = getInstant(mtTimeline,
instantTimestamp);
+ Comparable[] row = new Comparable[numColumns];
+ row[0] = instantTimestamp;
+ row[1] = dtInstant.isPresent() ? dtInstant.get().getAction() : "-";
+ row[2] = dtInstant.isPresent() ? dtInstant.get().getState() : "-";
+ if (showRollbackInfo) {
+ if (dtInstant.isPresent()
+ &&
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(dtInstant.get().getAction())) {
+ row[3] = "Rolls back\n" + getInstantToRollback(dtTimeline,
dtInstant.get());
+ } else {
+ if (dtRollbackInfo.containsKey(instantTimestamp)) {
+ row[3] = "Rolled back by\n" + String.join(",\n",
dtRollbackInfo.get(instantTimestamp));
+ } else {
+ row[3] = "-";
+ }
+ }
+ }
+ row[numColumns - 8] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.REQUESTED, dtInstantInfoMap,
showTimeSeconds);
+ row[numColumns - 7] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.INFLIGHT, dtInstantInfoMap,
showTimeSeconds);
+ row[numColumns - 6] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.COMPLETED, dtInstantInfoMap,
showTimeSeconds);
+ row[numColumns - 5] = mtInstant.isPresent() ?
mtInstant.get().getAction() : "-";
+ row[numColumns - 4] = mtInstant.isPresent() ? mtInstant.get().getState()
: "-";
+ row[numColumns - 3] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.REQUESTED, mtInstantInfoMap,
showTimeSeconds);
+ row[numColumns - 2] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.INFLIGHT, mtInstantInfoMap,
showTimeSeconds);
+ row[numColumns - 1] = getFormattedDate(
+ instantTimestamp, HoodieInstant.State.COMPLETED, mtInstantInfoMap,
showTimeSeconds);
+ return row;
+ }).collect(Collectors.toList());
+ TableHeader header = new TableHeader()
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE);
+ if (showRollbackInfo) {
+ header.addTableHeaderField(HoodieTableHeaderFields.HEADER_ROLLBACK_INFO);
+ }
+ header.addTableHeaderField(HoodieTableHeaderFields.HEADER_REQUESTED_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_INFLIGHT_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPLETED_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_ACTION)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_STATE)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_REQUESTED_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_INFLIGHT_TIME)
+ .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_COMPLETED_TIME);
+ return HoodiePrintHelper.print(
+ header, new HashMap<>(), withRowNo, sortByField, descending, limit,
headerOnly, rows);
+ }
+
+ private Option<HoodieInstant> getInstant(HoodieTimeline timeline, String
instantTimestamp) {
+ return timeline.filter(instant ->
instant.getTimestamp().equals(instantTimestamp)).firstInstant();
+ }
+
+ private String getInstantToRollback(HoodieTimeline timeline, HoodieInstant
instant) {
+ try {
+ if (instant.isInflight()) {
+ HoodieInstant instantToUse = new HoodieInstant(
+ HoodieInstant.State.REQUESTED, instant.getAction(),
instant.getTimestamp());
+ HoodieRollbackPlan metadata = TimelineMetadataUtils
+
.deserializeAvroMetadata(timeline.getInstantDetails(instantToUse).get(),
HoodieRollbackPlan.class);
+ return metadata.getInstantToRollback().getCommitTime();
+ } else {
+ HoodieRollbackMetadata metadata = TimelineMetadataUtils
+
.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(),
HoodieRollbackMetadata.class);
+ return String.join(",", metadata.getCommitsRollback());
+ }
+ } catch (IOException e) {
+ LOG.error(String.format("Error reading rollback info of %s", instant));
+ e.printStackTrace();
+ return "-";
+ }
+ }
+
+ private Map<String, List<String>> getRolledBackInstantInfo(HoodieTimeline
timeline) {
+ // Instant rolled back or to roll back -> rollback instants
+ Map<String, List<String>> rollbackInfoMap = new HashMap<>();
+ List<HoodieInstant> rollbackInstants = timeline.filter(instant ->
+
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction()))
+ .getInstants().collect(Collectors.toList());
+ rollbackInstants.forEach(rollbackInstant -> {
+ try {
+ if (rollbackInstant.isInflight()) {
+ HoodieInstant instantToUse = new HoodieInstant(
+ HoodieInstant.State.REQUESTED, rollbackInstant.getAction(),
rollbackInstant.getTimestamp());
+ HoodieRollbackPlan metadata = TimelineMetadataUtils
+
.deserializeAvroMetadata(timeline.getInstantDetails(instantToUse).get(),
HoodieRollbackPlan.class);
+
rollbackInfoMap.computeIfAbsent(metadata.getInstantToRollback().getCommitTime(),
k -> new ArrayList<>())
+ .add(rollbackInstant.getTimestamp());
+ } else {
+ HoodieRollbackMetadata metadata = TimelineMetadataUtils
+
.deserializeAvroMetadata(timeline.getInstantDetails(rollbackInstant).get(),
HoodieRollbackMetadata.class);
+ metadata.getCommitsRollback().forEach(instant -> {
+ rollbackInfoMap.computeIfAbsent(instant, k -> new ArrayList<>())
+ .add(rollbackInstant.getTimestamp());
+ });
+ }
+ } catch (IOException e) {
+ LOG.error(String.format("Error reading rollback info of %s",
rollbackInstant));
+ e.printStackTrace();
+ }
+ });
+ return rollbackInfoMap;
+ }
+
+ static class HoodieInstantWithModTime extends HoodieInstant {
+
+ private final long modificationTimeMs;
+
+ public HoodieInstantWithModTime(FileStatus fileStatus) {
+ super(fileStatus);
+ this.modificationTimeMs = fileStatus.getModificationTime();
+ }
+
+ public long getModificationTime() {
+ return modificationTimeMs;
+ }
+ }
+
+ static class HoodieInstantTimeComparator implements Comparator<String> {
+ @Override
+ public int compare(String o1, String o2) {
+ // For metadata table, the compaction instant time is "012345001" while
the delta commit
+ // later is "012345", i.e., the compaction instant time has trailing
"001". In the
+ // actual event sequence, metadata table compaction happens before the
corresponding
+ // delta commit. For better visualization, we put "012345001" before
"012345"
+ // when sorting in ascending order.
+ if (o1.length() != o2.length()) {
+ // o1 is longer than o2
+ if (o1.length() - o2.length() == 3 && o1.endsWith("001") &&
o1.startsWith(o2)) {
+ return -1;
+ }
+ // o1 is shorter than o2
+ if (o2.length() - o1.length() == 3 && o2.endsWith("001") &&
o2.startsWith(o1)) {
+ return 1;
+ }
+ }
+ return o1.compareTo(o2);
+ }
+ }
+}