This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a6740d0767 [HUDI-3579] Add timeline commands in hudi-cli (#5139)
a6740d0767 is described below

commit a6740d0767f7b7296b05d5762071a3ba71abc9b8
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Aug 18 17:13:32 2022 -0700

    [HUDI-3579] Add timeline commands in hudi-cli (#5139)
---
 .../org/apache/hudi/cli/HoodiePrintHelper.java     |  73 +++-
 .../apache/hudi/cli/HoodieTableHeaderFields.java   |  15 +
 .../src/main/java/org/apache/hudi/cli/Table.java   |  53 ++-
 .../main/java/org/apache/hudi/cli/TableHeader.java |  10 +
 .../apache/hudi/cli/commands/TimelineCommand.java  | 410 +++++++++++++++++++++
 5 files changed, 536 insertions(+), 25 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
index be640376ee..0ffec2cac0 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
@@ -58,30 +58,76 @@ public class HoodiePrintHelper {
    * @param rows List of rows
    * @return Serialized form for printing
    */
-  public static String print(TableHeader rowHeader, Map<String, 
Function<Object, String>> fieldNameToConverterMap,
+  public static String print(
+      TableHeader rowHeader, Map<String, Function<Object, String>> 
fieldNameToConverterMap,
       String sortByField, boolean isDescending, Integer limit, boolean 
headerOnly, List<Comparable[]> rows) {
-    return print(rowHeader, fieldNameToConverterMap, sortByField, 
isDescending, limit, headerOnly, rows, "");
+    return print(rowHeader, fieldNameToConverterMap, false, sortByField, 
isDescending, limit, headerOnly, rows);
+  }
+
+  /**
+   * Serialize Table to printable string.
+   *
+   * @param rowHeader               Row Header
+   * @param fieldNameToConverterMap Field Specific Converters
+   * @param withRowNo               Whether to add row number
+   * @param sortByField             Sorting field
+   * @param isDescending            Order
+   * @param limit                   Limit
+   * @param headerOnly              Headers only
+   * @param rows                    List of rows
+   * @return Serialized form for printing
+   */
+  public static String print(
+      TableHeader rowHeader, Map<String, Function<Object, String>> 
fieldNameToConverterMap, boolean withRowNo,
+      String sortByField, boolean isDescending, Integer limit, boolean 
headerOnly, List<Comparable[]> rows) {
+    return print(rowHeader, fieldNameToConverterMap, withRowNo, sortByField, 
isDescending, limit, headerOnly, rows, "");
   }
 
   /**
    * Serialize Table to printable string and also export a temporary view to 
easily write sql queries.
+   * <p>
+   * Ideally, exporting view needs to be outside PrintHelper, but all commands 
use this. So this is easy
+   * way to add support for all commands
    *
+   * @param rowHeader               Row Header
+   * @param fieldNameToConverterMap Field Specific Converters
+   * @param sortByField             Sorting field
+   * @param isDescending            Order
+   * @param limit                   Limit
+   * @param headerOnly              Headers only
+   * @param rows                    List of rows
+   * @param tempTableName           table name to export
+   * @return Serialized form for printing
+   */
+  public static String print(
+      TableHeader rowHeader, Map<String, Function<Object, String>> 
fieldNameToConverterMap,
+      String sortByField, boolean isDescending, Integer limit, boolean 
headerOnly,
+      List<Comparable[]> rows, String tempTableName) {
+    return print(rowHeader, fieldNameToConverterMap, false, sortByField, 
isDescending, limit,
+        headerOnly, rows, tempTableName);
+  }
+
+  /**
+   * Serialize Table to printable string and also export a temporary view to 
easily write sql queries.
+   * <p>
    * Ideally, exporting view needs to be outside PrintHelper, but all commands 
use this. So this is easy
    * way to add support for all commands
    *
-   * @param rowHeader Row Header
+   * @param rowHeader               Row Header
    * @param fieldNameToConverterMap Field Specific Converters
-   * @param sortByField Sorting field
-   * @param isDescending Order
-   * @param limit Limit
-   * @param headerOnly Headers only
-   * @param rows List of rows
-   * @param tempTableName table name to export
+   * @param withRowNo               Whether to add row number
+   * @param sortByField             Sorting field
+   * @param isDescending            Order
+   * @param limit                   Limit
+   * @param headerOnly              Headers only
+   * @param rows                    List of rows
+   * @param tempTableName           table name to export
    * @return Serialized form for printing
    */
-  public static String print(TableHeader rowHeader, Map<String, 
Function<Object, String>> fieldNameToConverterMap,
-      String sortByField, boolean isDescending, Integer limit, boolean 
headerOnly, List<Comparable[]> rows,
-      String tempTableName) {
+  public static String print(
+      TableHeader rowHeader, Map<String, Function<Object, String>> 
fieldNameToConverterMap,
+      boolean withRowNo, String sortByField, boolean isDescending, Integer 
limit, boolean headerOnly,
+      List<Comparable[]> rows, String tempTableName) {
 
     if (headerOnly) {
       return HoodiePrintHelper.print(rowHeader);
@@ -97,7 +143,8 @@ public class HoodiePrintHelper {
     }
 
     Table table =
-        new Table(rowHeader, fieldNameToConverterMap, 
Option.ofNullable(sortByField.isEmpty() ? null : sortByField),
+        new Table(rowHeader, fieldNameToConverterMap, withRowNo,
+            Option.ofNullable(sortByField.isEmpty() ? null : sortByField),
             Option.ofNullable(isDescending), Option.ofNullable(limit <= 0 ? 
null : limit)).addAllRows(rows).flip();
 
     return HoodiePrintHelper.print(table);
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
index a4a8e46dfd..3bea0f2293 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieTableHeaderFields.java
@@ -22,6 +22,7 @@ package org.apache.hudi.cli;
  * Fields of print table header.
  */
 public class HoodieTableHeaderFields {
+  public static final String HEADER_ROW_NO = "No.";
   public static final String HEADER_PARTITION = "Partition";
   public static final String HEADER_INSTANT = "Instant";
   public static final String HEADER_PARTITION_PATH = HEADER_PARTITION + " 
Path";
@@ -166,4 +167,18 @@ public class HoodieTableHeaderFields {
   public static final String HEADER_DESTINATION_FILE_PATH = "Destination " + 
HEADER_FILE_PATH;
   public static final String HEADER_RENAME_EXECUTED = "Rename Executed?";
   public static final String HEADER_RENAME_SUCCEEDED = "Rename Succeeded?";
+
+  /**
+   * Fields of timeline command output
+   */
+  public static final String HEADER_REQUESTED_TIME = "Requested\nTime";
+  public static final String HEADER_INFLIGHT_TIME = "Inflight\nTime";
+  public static final String HEADER_COMPLETED_TIME = "Completed\nTime";
+  public static final String HEADER_ROLLBACK_INFO = "Rollback Info";
+  public static final String HEADER_MT_PREFIX = "MT\n";
+  public static final String HEADER_MT_ACTION = HEADER_MT_PREFIX + 
HEADER_ACTION;
+  public static final String HEADER_MT_STATE = HEADER_MT_PREFIX + HEADER_STATE;
+  public static final String HEADER_MT_REQUESTED_TIME = HEADER_MT_PREFIX + 
HEADER_REQUESTED_TIME;
+  public static final String HEADER_MT_INFLIGHT_TIME = HEADER_MT_PREFIX + 
HEADER_INFLIGHT_TIME;
+  public static final String HEADER_MT_COMPLETED_TIME = HEADER_MT_PREFIX + 
HEADER_COMPLETED_TIME;
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
index 8158eef8d5..70e8a97403 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/Table.java
@@ -37,6 +37,8 @@ public class Table implements Iterable<List<String>> {
 
   // Header for this table
   private final TableHeader rowHeader;
+  // Whether to print row number
+  private final boolean addRowNo;
   // User-specified conversions before rendering
   private final Map<String, Function<Object, String>> fieldNameToConverterMap;
   // Option attribute to track sorting field
@@ -49,12 +51,17 @@ public class Table implements Iterable<List<String>> {
   private final List<List<Comparable>> rawRows;
   // Flag to determine if all the rows have been added
   private boolean finishedAdding = false;
-  // Rows ready for Rendering
+  // Headers ready for rendering
+  private TableHeader renderHeaders;
+  // Rows ready for rendering
   private List<List<String>> renderRows;
 
-  public Table(TableHeader rowHeader, Map<String, Function<Object, String>> 
fieldNameToConverterMap,
-      Option<String> orderingFieldNameOptional, Option<Boolean> 
isDescendingOptional, Option<Integer> limitOptional) {
+  public Table(
+      TableHeader rowHeader, Map<String, Function<Object, String>> 
fieldNameToConverterMap,
+      boolean addRowNo, Option<String> orderingFieldNameOptional,
+      Option<Boolean> isDescendingOptional, Option<Integer> limitOptional) {
     this.rowHeader = rowHeader;
+    this.addRowNo = addRowNo;
     this.fieldNameToConverterMap = fieldNameToConverterMap;
     this.orderingFieldNameOptional = orderingFieldNameOptional;
     this.isDescendingOptional = isDescendingOptional;
@@ -64,7 +71,7 @@ public class Table implements Iterable<List<String>> {
 
   /**
    * Main API to add row to the table.
-   * 
+   *
    * @param row Row
    */
   public Table add(List<Comparable> row) {
@@ -134,15 +141,34 @@ public class Table implements Iterable<List<String>> {
   private void sortAndLimit() {
     this.renderRows = new ArrayList<>();
     final int limit = this.limitOptional.orElse(rawRows.size());
-    final List<List<Comparable>> orderedRows = orderRows();
-    renderRows = orderedRows.stream().limit(limit).map(row -> 
IntStream.range(0, rowHeader.getNumFields()).mapToObj(idx -> {
-      String fieldName = rowHeader.get(idx);
-      if (fieldNameToConverterMap.containsKey(fieldName)) {
-        return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
+    // Row number is added here if enabled
+    final List<List<Comparable>> rawOrderedRows = orderRows();
+    final List<List<Comparable>> orderedRows;
+    if (addRowNo) {
+      orderedRows = new ArrayList<>();
+      int rowNo = 0;
+      for (List<Comparable> row : rawOrderedRows) {
+        List<Comparable> newRow = new ArrayList<>();
+        newRow.add(rowNo++);
+        newRow.addAll(row);
+        orderedRows.add(newRow);
       }
-      Object v = row.get(idx);
-      return v == null ? "null" : v.toString();
-    }).collect(Collectors.toList())).collect(Collectors.toList());
+    } else {
+      orderedRows = rawOrderedRows;
+    }
+    renderHeaders = addRowNo
+        ? new 
TableHeader().addTableHeaderField(HoodieTableHeaderFields.HEADER_ROW_NO)
+        .addTableHeaderFields(rowHeader)
+        : rowHeader;
+    renderRows = orderedRows.stream().limit(limit)
+        .map(row -> IntStream.range(0, 
renderHeaders.getNumFields()).mapToObj(idx -> {
+          String fieldName = renderHeaders.get(idx);
+          if (fieldNameToConverterMap.containsKey(fieldName)) {
+            return fieldNameToConverterMap.get(fieldName).apply(row.get(idx));
+          }
+          Object v = row.get(idx);
+          return v == null ? "null" : v.toString();
+        }).collect(Collectors.toList())).collect(Collectors.toList());
   }
 
   @Override
@@ -162,6 +188,9 @@ public class Table implements Iterable<List<String>> {
   }
 
   public List<String> getFieldNames() {
+    if (renderHeaders != null) {
+      return renderHeaders.getFieldNames();
+    }
     return rowHeader.getFieldNames();
   }
 
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java
index 8ec392d1ab..ee17480a30 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/TableHeader.java
@@ -39,6 +39,16 @@ public class TableHeader {
     return this;
   }
 
+  /**
+   * Add fields from another {@link TableHeader} instance.
+   *
+   * @param tableHeader {@link TableHeader} instance.
+   */
+  public TableHeader addTableHeaderFields(TableHeader tableHeader) {
+    fieldNames.addAll(tableHeader.getFieldNames());
+    return this;
+  }
+
   /**
    * Get all field names.
    */
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
new file mode 100644
index 0000000000..9af04d155b
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TimelineCommand.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.cli.HoodieCLI;
+import org.apache.hudi.cli.HoodiePrintHelper;
+import org.apache.hudi.cli.HoodieTableHeaderFields;
+import org.apache.hudi.cli.TableHeader;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * CLI command to display timeline options.
+ */
+@Component
+public class TimelineCommand implements CommandMarker {
+
+  private static final Logger LOG = 
LogManager.getLogger(TimelineCommand.class);
+  private static final SimpleDateFormat DATE_FORMAT_DEFAULT = new 
SimpleDateFormat("MM-dd HH:mm");
+  private static final SimpleDateFormat DATE_FORMAT_SECONDS = new 
SimpleDateFormat("MM-dd HH:mm:ss");
+
+  @CliCommand(value = "timeline show active", help = "List all instants in 
active timeline")
+  public String showActive(
+      @CliOption(key = {"limit"}, help = "Limit #rows to be displayed", 
unspecifiedDefaultValue = "10") Integer limit,
+      @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
+      @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = 
"false") final boolean descending,
+      @CliOption(key = {"headeronly"}, help = "Print Header Only",
+          unspecifiedDefaultValue = "false") final boolean headerOnly,
+      @CliOption(key = {"with-metadata-table"}, help = "Show metadata table 
timeline together with data table",
+          unspecifiedDefaultValue = "false") final boolean withMetadataTable,
+      @CliOption(key = {"show-rollback-info"}, help = "Show instant to 
rollback for rollbacks",
+          unspecifiedDefaultValue = "false") final boolean showRollbackInfo,
+      @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant 
file modification time",
+          unspecifiedDefaultValue = "false") final boolean showTimeSeconds) {
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+    try {
+      if (withMetadataTable) {
+        HoodieTableMetaClient mtMetaClient = 
getMetadataTableMetaClient(metaClient);
+        return printTimelineInfoWithMetadataTable(
+            metaClient.getActiveTimeline(), mtMetaClient.getActiveTimeline(),
+            getInstantInfoFromTimeline(metaClient.getFs(), 
metaClient.getMetaPath()),
+            getInstantInfoFromTimeline(mtMetaClient.getFs(), 
mtMetaClient.getMetaPath()),
+            limit, sortByField, descending, headerOnly, true, showTimeSeconds, 
showRollbackInfo);
+      }
+      return printTimelineInfo(
+          metaClient.getActiveTimeline(),
+          getInstantInfoFromTimeline(metaClient.getFs(), 
metaClient.getMetaPath()),
+          limit, sortByField, descending, headerOnly, true, showTimeSeconds, 
showRollbackInfo);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return e.getMessage();
+    }
+  }
+
+  @CliCommand(value = "timeline show incomplete", help = "List all incomplete 
instants in active timeline")
+  public String showIncomplete(
+      @CliOption(key = {"limit"}, help = "Limit #rows to be displayed", 
unspecifiedDefaultValue = "10") Integer limit,
+      @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
+      @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = 
"false") final boolean descending,
+      @CliOption(key = {"headeronly"}, help = "Print Header Only",
+          unspecifiedDefaultValue = "false") final boolean headerOnly,
+      @CliOption(key = {"show-rollback-info"}, help = "Show instant to 
rollback for rollbacks",
+          unspecifiedDefaultValue = "false") final boolean showRollbackInfo,
+      @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant 
file modification time",
+          unspecifiedDefaultValue = "false") final boolean showTimeSeconds) {
+    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
+    try {
+      return printTimelineInfo(
+          metaClient.getActiveTimeline().filterInflightsAndRequested(),
+          getInstantInfoFromTimeline(metaClient.getFs(), 
metaClient.getMetaPath()),
+          limit, sortByField, descending, headerOnly, true, showTimeSeconds, 
showRollbackInfo);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return e.getMessage();
+    }
+  }
+
+  @CliCommand(value = "metadata timeline show active",
+      help = "List all instants in active timeline of metadata table")
+  public String metadataShowActive(
+      @CliOption(key = {"limit"}, help = "Limit #rows to be displayed", 
unspecifiedDefaultValue = "10") Integer limit,
+      @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
+      @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = 
"false") final boolean descending,
+      @CliOption(key = {"headeronly"}, help = "Print Header Only",
+          unspecifiedDefaultValue = "false") final boolean headerOnly,
+      @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant 
file modification time",
+          unspecifiedDefaultValue = "false") final boolean showTimeSeconds) {
+    HoodieTableMetaClient metaClient = 
getMetadataTableMetaClient(HoodieCLI.getTableMetaClient());
+    try {
+      return printTimelineInfo(
+          metaClient.getActiveTimeline(),
+          getInstantInfoFromTimeline(metaClient.getFs(), 
metaClient.getMetaPath()),
+          limit, sortByField, descending, headerOnly, true, showTimeSeconds, 
false);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return e.getMessage();
+    }
+  }
+
+  @CliCommand(value = "metadata timeline show incomplete",
+      help = "List all incomplete instants in active timeline of metadata 
table")
+  public String metadataShowIncomplete(
+      @CliOption(key = {"limit"}, help = "Limit #rows to be displayed", 
unspecifiedDefaultValue = "10") Integer limit,
+      @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
+      @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = 
"false") final boolean descending,
+      @CliOption(key = {"headeronly"}, help = "Print Header Only",
+          unspecifiedDefaultValue = "false") final boolean headerOnly,
+      @CliOption(key = {"show-time-seconds"}, help = "Show seconds in instant 
file modification time",
+          unspecifiedDefaultValue = "false") final boolean showTimeSeconds) {
+    HoodieTableMetaClient metaClient = 
getMetadataTableMetaClient(HoodieCLI.getTableMetaClient());
+    try {
+      return printTimelineInfo(
+          metaClient.getActiveTimeline().filterInflightsAndRequested(),
+          getInstantInfoFromTimeline(metaClient.getFs(), 
metaClient.getMetaPath()),
+          limit, sortByField, descending, headerOnly, true, showTimeSeconds, 
false);
+    } catch (IOException e) {
+      e.printStackTrace();
+      return e.getMessage();
+    }
+  }
+
+  private HoodieTableMetaClient 
getMetadataTableMetaClient(HoodieTableMetaClient metaClient) {
+    return HoodieTableMetaClient.builder().setConf(HoodieCLI.conf)
+        
.setBasePath(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))
+        .setLoadActiveTimelineOnLoad(false)
+        .setConsistencyGuardConfig(HoodieCLI.consistencyGuardConfig)
+        .build();
+  }
+
+  private Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> 
getInstantInfoFromTimeline(
+      FileSystem fs, String metaPath) throws IOException {
+    Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> instantMap 
= new HashMap<>();
+    Stream<HoodieInstantWithModTime> instantStream = Arrays.stream(
+        HoodieTableMetaClient.scanFiles(fs, new Path(metaPath), path -> {
+          // Include only the meta files with extensions that needs to be 
included
+          String extension = 
HoodieInstant.getTimelineFileExtension(path.getName());
+          return 
HoodieActiveTimeline.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE.contains(extension);
+        })).map(HoodieInstantWithModTime::new);
+    instantStream.forEach(instant -> {
+      instantMap.computeIfAbsent(instant.getTimestamp(), t -> new HashMap<>())
+          .put(instant.getState(), instant);
+    });
+    return instantMap;
+  }
+
+  private String getFormattedDate(
+      String instantTimestamp, HoodieInstant.State state,
+      Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> 
instantInfoMap,
+      boolean showTimeSeconds) {
+    Long timeMs = null;
+    Map<HoodieInstant.State, HoodieInstantWithModTime> mapping = 
instantInfoMap.get(instantTimestamp);
+    if (mapping != null && mapping.containsKey(state)) {
+      timeMs = mapping.get(state).getModificationTime();
+    }
+    SimpleDateFormat sdf = showTimeSeconds ? DATE_FORMAT_SECONDS : 
DATE_FORMAT_DEFAULT;
+    return timeMs != null ? sdf.format(new Date(timeMs)) : "-";
+  }
+
+  private String printTimelineInfo(
+      HoodieTimeline timeline,
+      Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> 
instantInfoMap,
+      Integer limit, String sortByField, boolean descending, boolean 
headerOnly, boolean withRowNo,
+      boolean showTimeSeconds, boolean showRollbackInfo) {
+    Map<String, List<String>> rollbackInfo = 
getRolledBackInstantInfo(timeline);
+    final List<Comparable[]> rows = timeline.getInstants().map(instant -> {
+      int numColumns = showRollbackInfo ? 7 : 6;
+      Comparable[] row = new Comparable[numColumns];
+      String instantTimestamp = instant.getTimestamp();
+      row[0] = instantTimestamp;
+      row[1] = instant.getAction();
+      row[2] = instant.getState();
+      if (showRollbackInfo) {
+        if 
(HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction())) {
+          row[3] = "Rolls back\n" + getInstantToRollback(timeline, instant);
+        } else {
+          if (rollbackInfo.containsKey(instantTimestamp)) {
+            row[3] = "Rolled back by\n" + String.join(",\n", 
rollbackInfo.get(instantTimestamp));
+          } else {
+            row[3] = "-";
+          }
+        }
+      }
+      row[numColumns - 3] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.REQUESTED, instantInfoMap, 
showTimeSeconds);
+      row[numColumns - 2] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.INFLIGHT, instantInfoMap, 
showTimeSeconds);
+      row[numColumns - 1] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.COMPLETED, instantInfoMap, 
showTimeSeconds);
+      return row;
+    }).collect(Collectors.toList());
+    TableHeader header = new TableHeader()
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE);
+    if (showRollbackInfo) {
+      header.addTableHeaderField(HoodieTableHeaderFields.HEADER_ROLLBACK_INFO);
+    }
+    header.addTableHeaderField(HoodieTableHeaderFields.HEADER_REQUESTED_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_INFLIGHT_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPLETED_TIME);
+    return HoodiePrintHelper.print(
+        header, new HashMap<>(), withRowNo, sortByField, descending, limit, 
headerOnly, rows);
+  }
+
+  private String printTimelineInfoWithMetadataTable(
+      HoodieTimeline dtTimeline, HoodieTimeline mtTimeline,
+      Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> 
dtInstantInfoMap,
+      Map<String, Map<HoodieInstant.State, HoodieInstantWithModTime>> 
mtInstantInfoMap,
+      Integer limit, String sortByField, boolean descending, boolean 
headerOnly, boolean withRowNo,
+      boolean showTimeSeconds, boolean showRollbackInfo) {
+    Set<String> instantTimeSet = new HashSet(dtInstantInfoMap.keySet());
+    instantTimeSet.addAll(mtInstantInfoMap.keySet());
+    List<String> instantTimeList = instantTimeSet.stream()
+        .sorted(new 
HoodieInstantTimeComparator()).collect(Collectors.toList());
+    Map<String, List<String>> dtRollbackInfo = 
getRolledBackInstantInfo(dtTimeline);
+
+    final List<Comparable[]> rows = 
instantTimeList.stream().map(instantTimestamp -> {
+      int numColumns = showRollbackInfo ? 12 : 11;
+      Option<HoodieInstant> dtInstant = getInstant(dtTimeline, 
instantTimestamp);
+      Option<HoodieInstant> mtInstant = getInstant(mtTimeline, 
instantTimestamp);
+      Comparable[] row = new Comparable[numColumns];
+      row[0] = instantTimestamp;
+      row[1] = dtInstant.isPresent() ? dtInstant.get().getAction() : "-";
+      row[2] = dtInstant.isPresent() ? dtInstant.get().getState() : "-";
+      if (showRollbackInfo) {
+        if (dtInstant.isPresent()
+            && 
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(dtInstant.get().getAction())) {
+          row[3] = "Rolls back\n" + getInstantToRollback(dtTimeline, 
dtInstant.get());
+        } else {
+          if (dtRollbackInfo.containsKey(instantTimestamp)) {
+            row[3] = "Rolled back by\n" + String.join(",\n", 
dtRollbackInfo.get(instantTimestamp));
+          } else {
+            row[3] = "-";
+          }
+        }
+      }
+      row[numColumns - 8] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.REQUESTED, dtInstantInfoMap, 
showTimeSeconds);
+      row[numColumns - 7] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.INFLIGHT, dtInstantInfoMap, 
showTimeSeconds);
+      row[numColumns - 6] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.COMPLETED, dtInstantInfoMap, 
showTimeSeconds);
+      row[numColumns - 5] = mtInstant.isPresent() ? 
mtInstant.get().getAction() : "-";
+      row[numColumns - 4] = mtInstant.isPresent() ? mtInstant.get().getState() 
: "-";
+      row[numColumns - 3] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.REQUESTED, mtInstantInfoMap, 
showTimeSeconds);
+      row[numColumns - 2] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.INFLIGHT, mtInstantInfoMap, 
showTimeSeconds);
+      row[numColumns - 1] = getFormattedDate(
+          instantTimestamp, HoodieInstant.State.COMPLETED, mtInstantInfoMap, 
showTimeSeconds);
+      return row;
+    }).collect(Collectors.toList());
+    TableHeader header = new TableHeader()
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_INSTANT)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_ACTION)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_STATE);
+    if (showRollbackInfo) {
+      header.addTableHeaderField(HoodieTableHeaderFields.HEADER_ROLLBACK_INFO);
+    }
+    header.addTableHeaderField(HoodieTableHeaderFields.HEADER_REQUESTED_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_INFLIGHT_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_COMPLETED_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_ACTION)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_STATE)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_REQUESTED_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_INFLIGHT_TIME)
+        .addTableHeaderField(HoodieTableHeaderFields.HEADER_MT_COMPLETED_TIME);
+    return HoodiePrintHelper.print(
+        header, new HashMap<>(), withRowNo, sortByField, descending, limit, 
headerOnly, rows);
+  }
+
+  private Option<HoodieInstant> getInstant(HoodieTimeline timeline, String 
instantTimestamp) {
+    return timeline.filter(instant -> 
instant.getTimestamp().equals(instantTimestamp)).firstInstant();
+  }
+
+  private String getInstantToRollback(HoodieTimeline timeline, HoodieInstant 
instant) {
+    try {
+      if (instant.isInflight()) {
+        HoodieInstant instantToUse = new HoodieInstant(
+            HoodieInstant.State.REQUESTED, instant.getAction(), 
instant.getTimestamp());
+        HoodieRollbackPlan metadata = TimelineMetadataUtils
+            
.deserializeAvroMetadata(timeline.getInstantDetails(instantToUse).get(), 
HoodieRollbackPlan.class);
+        return metadata.getInstantToRollback().getCommitTime();
+      } else {
+        HoodieRollbackMetadata metadata = TimelineMetadataUtils
+            
.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), 
HoodieRollbackMetadata.class);
+        return String.join(",", metadata.getCommitsRollback());
+      }
+    } catch (IOException e) {
+      LOG.error(String.format("Error reading rollback info of %s", instant));
+      e.printStackTrace();
+      return "-";
+    }
+  }
+
+  private Map<String, List<String>> getRolledBackInstantInfo(HoodieTimeline 
timeline) {
+    // Instant rolled back or to roll back -> rollback instants
+    Map<String, List<String>> rollbackInfoMap = new HashMap<>();
+    List<HoodieInstant> rollbackInstants = timeline.filter(instant ->
+            
HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction()))
+        .getInstants().collect(Collectors.toList());
+    rollbackInstants.forEach(rollbackInstant -> {
+      try {
+        if (rollbackInstant.isInflight()) {
+          HoodieInstant instantToUse = new HoodieInstant(
+              HoodieInstant.State.REQUESTED, rollbackInstant.getAction(), 
rollbackInstant.getTimestamp());
+          HoodieRollbackPlan metadata = TimelineMetadataUtils
+              
.deserializeAvroMetadata(timeline.getInstantDetails(instantToUse).get(), 
HoodieRollbackPlan.class);
+          
rollbackInfoMap.computeIfAbsent(metadata.getInstantToRollback().getCommitTime(),
 k -> new ArrayList<>())
+              .add(rollbackInstant.getTimestamp());
+        } else {
+          HoodieRollbackMetadata metadata = TimelineMetadataUtils
+              
.deserializeAvroMetadata(timeline.getInstantDetails(rollbackInstant).get(), 
HoodieRollbackMetadata.class);
+          metadata.getCommitsRollback().forEach(instant -> {
+            rollbackInfoMap.computeIfAbsent(instant, k -> new ArrayList<>())
+                .add(rollbackInstant.getTimestamp());
+          });
+        }
+      } catch (IOException e) {
+        LOG.error(String.format("Error reading rollback info of %s", 
rollbackInstant));
+        e.printStackTrace();
+      }
+    });
+    return rollbackInfoMap;
+  }
+
+  static class HoodieInstantWithModTime extends HoodieInstant {
+
+    private final long modificationTimeMs;
+
+    public HoodieInstantWithModTime(FileStatus fileStatus) {
+      super(fileStatus);
+      this.modificationTimeMs = fileStatus.getModificationTime();
+    }
+
+    public long getModificationTime() {
+      return modificationTimeMs;
+    }
+  }
+
+  static class HoodieInstantTimeComparator implements Comparator<String> {
+    @Override
+    public int compare(String o1, String o2) {
+      // For metadata table, the compaction instant time is "012345001" while 
the delta commit
+      // later is "012345", i.e., the compaction instant time has trailing 
"001".  In the
+      // actual event sequence, metadata table compaction happens before the 
corresponding
+      // delta commit.  For better visualization, we put "012345001" before 
"012345"
+      // when sorting in ascending order.
+      if (o1.length() != o2.length()) {
+        // o1 is longer than o2
+        if (o1.length() - o2.length() == 3 && o1.endsWith("001") && 
o1.startsWith(o2)) {
+          return -1;
+        }
+        // o1 is shorter than o2
+        if (o2.length() - o1.length() == 3 && o2.endsWith("001") && 
o2.startsWith(o1)) {
+          return 1;
+        }
+      }
+      return o1.compareTo(o2);
+    }
+  }
+}

Reply via email to