This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d37818  [CLI] Add export to table
3d37818 is described below

commit 3d3781810c2fd28c85407b04aad08fc2a85174dc
Author: Satish Kotha <[email protected]>
AuthorDate: Wed Feb 12 11:23:40 2020 -0800

    [CLI] Add export to table
---
 .../main/java/org/apache/hudi/cli/HoodieCLI.java   |  11 ++
 .../org/apache/hudi/cli/HoodiePrintHelper.java     |  30 +++++
 .../apache/hudi/cli/commands/CommitsCommand.java   |  42 +++++--
 .../apache/hudi/cli/commands/TempViewCommand.java  |  55 +++++++++
 .../hudi/cli/utils/SparkTempViewProvider.java      | 134 +++++++++++++++++++++
 .../apache/hudi/cli/utils/TempViewProvider.java    |  29 +++++
 6 files changed, 288 insertions(+), 13 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
index 561e499..af68035 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.cli;
 
+import org.apache.hudi.cli.utils.SparkTempViewProvider;
+import org.apache.hudi.cli.utils.TempViewProvider;
 import org.apache.hudi.common.model.TimelineLayoutVersion;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ConsistencyGuardConfig;
@@ -43,6 +45,7 @@ public class HoodieCLI {
   protected static HoodieTableMetaClient tableMetadata;
   public static HoodieTableMetaClient syncTableMetadata;
   public static TimelineLayoutVersion layoutVersion;
+  private static TempViewProvider tempViewProvider;
 
   /**
    * Enum for CLI state.
@@ -105,4 +108,12 @@ public class HoodieCLI {
     return tableMetadata;
   }
 
+  public static synchronized TempViewProvider getTempViewProvider() {
+    if (tempViewProvider == null) {
+      tempViewProvider = new 
SparkTempViewProvider(HoodieCLI.class.getSimpleName());
+    }
+
+    return tempViewProvider;
+  }
+
 }
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
index 53114ce..be64037 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
@@ -19,12 +19,15 @@
 package org.apache.hudi.cli;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 
 import com.jakewharton.fliptables.FlipTable;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 
 /**
@@ -57,11 +60,38 @@ public class HoodiePrintHelper {
    */
   public static String print(TableHeader rowHeader, Map<String, 
Function<Object, String>> fieldNameToConverterMap,
       String sortByField, boolean isDescending, Integer limit, boolean 
headerOnly, List<Comparable[]> rows) {
+    return print(rowHeader, fieldNameToConverterMap, sortByField, 
isDescending, limit, headerOnly, rows, "");
+  }
+
+  /**
+   * Serialize Table to printable string and also export a temporary view to 
easily write sql queries.
+   *
+   * Ideally, exporting view needs to be outside PrintHelper, but all commands 
use this. So this is easy
+   * way to add support for all commands
+   *
+   * @param rowHeader Row Header
+   * @param fieldNameToConverterMap Field Specific Converters
+   * @param sortByField Sorting field
+   * @param isDescending Order
+   * @param limit Limit
+   * @param headerOnly Headers only
+   * @param rows List of rows
+   * @param tempTableName table name to export
+   * @return Serialized form for printing
+   */
+  public static String print(TableHeader rowHeader, Map<String, 
Function<Object, String>> fieldNameToConverterMap,
+      String sortByField, boolean isDescending, Integer limit, boolean 
headerOnly, List<Comparable[]> rows,
+      String tempTableName) {
 
     if (headerOnly) {
       return HoodiePrintHelper.print(rowHeader);
     }
 
+    if (!StringUtils.isNullOrEmpty(tempTableName)) {
+      HoodieCLI.getTempViewProvider().createOrReplace(tempTableName, 
rowHeader.getFieldNames(),
+          rows.stream().map(columns -> 
Arrays.asList(columns)).collect(Collectors.toList()));
+    }
+
     if (!sortByField.isEmpty() && !rowHeader.containsField(sortByField)) {
       return String.format("Field[%s] is not in table, given columns[%s]", 
sortByField, rowHeader.getFieldNames());
     }
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 804096b..3c08305 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -33,7 +33,6 @@ import 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.NumericUtils;
-
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.spark.launcher.SparkLauncher;
 import org.springframework.shell.core.CommandMarker;
@@ -59,7 +58,8 @@ public class CommitsCommand implements CommandMarker {
   private String printCommits(HoodieDefaultTimeline timeline,
                               final Integer limit, final String sortByField,
                               final boolean descending,
-                              final boolean headerOnly) throws IOException {
+                              final boolean headerOnly,
+                              final String tempTableName) throws IOException {
     final List<Comparable[]> rows = new ArrayList<>();
 
     final List<HoodieInstant> commits = 
timeline.getCommitsTimeline().filterCompletedInstants()
@@ -96,13 +96,16 @@ public class CommitsCommand implements CommandMarker {
             .addTableHeaderField("Total Records Written")
             .addTableHeaderField("Total Update Records Written")
             .addTableHeaderField("Total Errors");
-    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
+
+    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending,
+            limit, headerOnly, rows, tempTableName);
   }
 
   private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
                               final Integer limit, final String sortByField,
                               final boolean descending,
-                              final boolean headerOnly) throws IOException {
+                              final boolean headerOnly,
+                              final String tempTableName) throws IOException {
     final List<Comparable[]> rows = new ArrayList<>();
 
     final List<HoodieInstant> commits = 
timeline.getCommitsTimeline().filterCompletedInstants()
@@ -144,13 +147,16 @@ public class CommitsCommand implements CommandMarker {
             .addTableHeaderField("Total Rollback 
Blocks").addTableHeaderField("Total Log Records")
             .addTableHeaderField("Total Updated Records 
Compacted").addTableHeaderField("Total Write Bytes");
 
-    return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, 
descending, limit, headerOnly, rows);
+    return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, 
descending,
+            limit, headerOnly, rows, tempTableName);
   }
 
   @CliCommand(value = "commits show", help = "Show the commits")
   public String showCommits(
       @CliOption(key = {"includeExtraMetadata"}, help = "Include extra 
metadata",
           unspecifiedDefaultValue = "false") final boolean 
includeExtraMetadata,
+      @CliOption(key = {"createView"}, mandatory = false, help = "view name to 
store output table",
+          unspecifiedDefaultValue = "") final String exportTableName,
       @CliOption(key = {"limit"}, help = "Limit commits",
           unspecifiedDefaultValue = "-1") final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
@@ -161,9 +167,9 @@ public class CommitsCommand implements CommandMarker {
 
     HoodieActiveTimeline activeTimeline = 
HoodieCLI.getTableMetaClient().getActiveTimeline();
     if (includeExtraMetadata) {
-      return printCommitsWithMetadata(activeTimeline, limit, sortByField, 
descending, headerOnly);
+      return printCommitsWithMetadata(activeTimeline, limit, sortByField, 
descending, headerOnly, exportTableName);
     } else  {
-      return printCommits(activeTimeline, limit, sortByField, descending, 
headerOnly);
+      return printCommits(activeTimeline, limit, sortByField, descending, 
headerOnly, exportTableName);
     }
   }
 
@@ -171,6 +177,8 @@ public class CommitsCommand implements CommandMarker {
   public String showArchivedCommits(
           @CliOption(key = {"includeExtraMetadata"}, help = "Include extra 
metadata",
                   unspecifiedDefaultValue = "false") final boolean 
includeExtraMetadata,
+          @CliOption(key = {"createView"}, mandatory = false, help = "view 
name to store output table",
+                  unspecifiedDefaultValue = "") final String exportTableName,
           @CliOption(key = {"startTs"},  mandatory = false, help = "start time 
for commits, default: now - 10 days")
           String startTs,
           @CliOption(key = {"endTs"},  mandatory = false, help = "end time for 
commits, default: now - 1 day")
@@ -195,9 +203,9 @@ public class CommitsCommand implements CommandMarker {
       archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
       HoodieDefaultTimeline timelineRange = 
archivedTimeline.findInstantsInRange(startTs, endTs);
       if (includeExtraMetadata) {
-        return printCommitsWithMetadata(timelineRange, limit, sortByField, 
descending, headerOnly);
+        return printCommitsWithMetadata(timelineRange, limit, sortByField, 
descending, headerOnly, exportTableName);
       } else  {
-        return printCommits(timelineRange, limit, sortByField, descending, 
headerOnly);
+        return printCommits(timelineRange, limit, sortByField, descending, 
headerOnly, exportTableName);
       }
     } finally {
       // clear the instant details from memory after printing to reduce usage
@@ -237,7 +245,10 @@ public class CommitsCommand implements CommandMarker {
   }
 
   @CliCommand(value = "commit showpartitions", help = "Show partition level 
details of a commit")
-  public String showCommitPartitions(@CliOption(key = {"commit"}, help = 
"Commit to show") final String commitTime,
+  public String showCommitPartitions(
+      @CliOption(key = {"createView"}, mandatory = false, help = "view name to 
store output table",
+          unspecifiedDefaultValue = "") final String exportTableName,
+      @CliOption(key = {"commit"}, help = "Commit to show") final String 
commitTime,
       @CliOption(key = {"limit"}, help = "Limit commits", 
unspecifiedDefaultValue = "-1") final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
       @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = 
"false") final boolean descending,
@@ -287,11 +298,15 @@ public class CommitsCommand implements CommandMarker {
         .addTableHeaderField("Total Records 
Inserted").addTableHeaderField("Total Records Updated")
         .addTableHeaderField("Total Bytes Written").addTableHeaderField("Total 
Errors");
 
-    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending, limit, headerOnly, rows);
+    return HoodiePrintHelper.print(header, fieldNameToConverterMap, 
sortByField, descending,
+        limit, headerOnly, rows, exportTableName);
   }
 
   @CliCommand(value = "commit showfiles", help = "Show file level details of a 
commit")
-  public String showCommitFiles(@CliOption(key = {"commit"}, help = "Commit to 
show") final String commitTime,
+  public String showCommitFiles(
+      @CliOption(key = {"createView"}, mandatory = false, help = "view name to 
store output table",
+          unspecifiedDefaultValue = "") final String exportTableName,
+      @CliOption(key = {"commit"}, help = "Commit to show") final String 
commitTime,
       @CliOption(key = {"limit"}, help = "Limit commits", 
unspecifiedDefaultValue = "-1") final Integer limit,
       @CliOption(key = {"sortBy"}, help = "Sorting Field", 
unspecifiedDefaultValue = "") final String sortByField,
       @CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue = 
"false") final boolean descending,
@@ -323,7 +338,8 @@ public class CommitsCommand implements CommandMarker {
         .addTableHeaderField("Total Records 
Written").addTableHeaderField("Total Bytes Written")
         .addTableHeaderField("Total Errors").addTableHeaderField("File Size");
 
-    return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, 
descending, limit, headerOnly, rows);
+    return HoodiePrintHelper.print(header, new HashMap<>(), sortByField, 
descending,
+        limit, headerOnly, rows, exportTableName);
   }
 
   @CliCommand(value = "commits compare", help = "Compare commits with another 
Hoodie table")
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java
new file mode 100644
index 0000000..39e3767
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * CLI command to query/delete temp views.
+ */
+@Component
+public class TempViewCommand implements CommandMarker {
+
+  private static final String EMPTY_STRING = "";
+
+  @CliCommand(value = "temp_query", help = "query against created temp view")
+  public String query(
+          @CliOption(key = {"sql"}, mandatory = true, help = "select query to 
run against view") final String sql)
+          throws IOException {
+
+    HoodieCLI.getTempViewProvider().runQuery(sql);
+    return EMPTY_STRING;
+  }
+
+  @CliCommand(value = "temp_delete", help = "Delete view name")
+  public String delete(
+          @CliOption(key = {"view"}, mandatory = true, help = "view name") 
final String tableName)
+          throws IOException {
+
+    HoodieCLI.getTempViewProvider().deleteTable(tableName);
+    return EMPTY_STRING;
+  }
+}
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
new file mode 100644
index 0000000..68c18f9
--- /dev/null
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.utils;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SparkTempViewProvider implements TempViewProvider {
+  private static final Logger LOG = 
LogManager.getLogger(SparkTempViewProvider.class);
+
+  private JavaSparkContext jsc;
+  private SQLContext sqlContext;
+
+  public SparkTempViewProvider(String appName) {
+    try {
+      SparkConf sparkConf = new SparkConf().setAppName(appName)
+              .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
+      jsc = new JavaSparkContext(sparkConf);
+      jsc.setLogLevel("ERROR");
+
+      sqlContext = new SQLContext(jsc);
+    } catch (Throwable ex) {
+      // log full stack trace and rethrow. Without this its difficult to debug 
failures, if any
+      LOG.error("unable to initialize spark context ", ex);
+      throw new HoodieException(ex);
+    }
+  }
+
+  @Override
+  public void createOrReplace(String tableName, List<String> headers, 
List<List<Comparable>> rows) {
+    try {
+      if (headers.isEmpty() || rows.isEmpty()) {
+        return;
+      }
+
+      if (rows.stream().filter(row -> row.size() != headers.size()).count() > 
0) {
+        throw new HoodieException("Invalid row, does not match headers " + 
headers.size() + " " + rows.size());
+      }
+
+      // replace all whitespaces in headers to make it easy to write sql 
queries
+      List<String> headersNoSpaces = headers.stream().map(title -> 
title.replaceAll("\\s+",""))
+              .collect(Collectors.toList());
+
+      // generate schema for table
+      StructType structType = new StructType();
+      for (int i = 0; i < headersNoSpaces.size(); i++) {
+        // try guessing data type from column data.
+        DataType headerDataType = getDataType(rows.get(0).get(i));
+        structType = 
structType.add(DataTypes.createStructField(headersNoSpaces.get(i), 
headerDataType, true));
+      }
+      List<Row> records = rows.stream().map(row -> 
RowFactory.create(row.toArray(new Comparable[row.size()])))
+              .collect(Collectors.toList());
+      Dataset<Row> dataset = this.sqlContext.createDataFrame(records, 
structType);
+      dataset.createOrReplaceTempView(tableName);
+      System.out.println("Wrote table view: " + tableName);
+    } catch (Throwable ex) {
+      // log full stack trace and rethrow. Without this its difficult to debug 
failures, if any
+      LOG.error("unable to write ", ex);
+      throw new HoodieException(ex);
+    }
+  }
+
+  @Override
+  public void runQuery(String sqlText) {
+    try {
+      this.sqlContext.sql(sqlText).show(Integer.MAX_VALUE, false);
+    } catch (Throwable ex) {
+      // log full stack trace and rethrow. Without this its difficult to debug 
failures, if any
+      LOG.error("unable to read ", ex);
+      throw new HoodieException(ex);
+    }
+  }
+
+  @Override
+  public void deleteTable(String tableName) {
+    try {
+      sqlContext.sql("DROP TABLE IF EXISTS " + tableName);
+    } catch (Throwable ex) {
+      // log full stack trace and rethrow. Without this its difficult to debug 
failures, if any
+      LOG.error("unable to initialize spark context ", ex);
+      throw new HoodieException(ex);
+    }
+  }
+
+  private DataType getDataType(Comparable comparable) {
+    if (comparable instanceof Integer) {
+      return DataTypes.IntegerType;
+    }
+
+    if (comparable instanceof Double) {
+      return DataTypes.DoubleType;
+    }
+
+    if (comparable instanceof Long) {
+      return DataTypes.LongType;
+    }
+
+    if (comparable instanceof Boolean) {
+      return DataTypes.BooleanType;
+    }
+
+    // TODO add additional types when needed. default to string
+    return DataTypes.StringType;
+  }
+}
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java
new file mode 100644
index 0000000..1075fdd
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.utils;
+
+import java.util.List;
+
+public interface TempViewProvider {
+  void createOrReplace(String tableName, List<String> headers, 
List<List<Comparable>> rows);
+
+  void runQuery(String sqlText);
+
+  void deleteTable(String tableName);
+}

Reply via email to