This is an automated email from the ASF dual-hosted git repository.
nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3d37818 [CLI] Add export to table
3d37818 is described below
commit 3d3781810c2fd28c85407b04aad08fc2a85174dc
Author: Satish Kotha <[email protected]>
AuthorDate: Wed Feb 12 11:23:40 2020 -0800
[CLI] Add export to table
---
.../main/java/org/apache/hudi/cli/HoodieCLI.java | 11 ++
.../org/apache/hudi/cli/HoodiePrintHelper.java | 30 +++++
.../apache/hudi/cli/commands/CommitsCommand.java | 42 +++++--
.../apache/hudi/cli/commands/TempViewCommand.java | 55 +++++++++
.../hudi/cli/utils/SparkTempViewProvider.java | 134 +++++++++++++++++++++
.../apache/hudi/cli/utils/TempViewProvider.java | 29 +++++
6 files changed, 288 insertions(+), 13 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
index 561e499..af68035 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodieCLI.java
@@ -18,6 +18,8 @@
package org.apache.hudi.cli;
+import org.apache.hudi.cli.utils.SparkTempViewProvider;
+import org.apache.hudi.cli.utils.TempViewProvider;
import org.apache.hudi.common.model.TimelineLayoutVersion;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ConsistencyGuardConfig;
@@ -43,6 +45,7 @@ public class HoodieCLI {
protected static HoodieTableMetaClient tableMetadata;
public static HoodieTableMetaClient syncTableMetadata;
public static TimelineLayoutVersion layoutVersion;
+ private static TempViewProvider tempViewProvider;
/**
* Enum for CLI state.
@@ -105,4 +108,12 @@ public class HoodieCLI {
return tableMetadata;
}
+ public static synchronized TempViewProvider getTempViewProvider() {
+ if (tempViewProvider == null) {
+ tempViewProvider = new
SparkTempViewProvider(HoodieCLI.class.getSimpleName());
+ }
+
+ return tempViewProvider;
+ }
+
}
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
index 53114ce..be64037 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/HoodiePrintHelper.java
@@ -19,12 +19,15 @@
package org.apache.hudi.cli;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import com.jakewharton.fliptables.FlipTable;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
+import java.util.stream.Collectors;
/**
@@ -57,11 +60,38 @@ public class HoodiePrintHelper {
*/
public static String print(TableHeader rowHeader, Map<String,
Function<Object, String>> fieldNameToConverterMap,
String sortByField, boolean isDescending, Integer limit, boolean
headerOnly, List<Comparable[]> rows) {
+ return print(rowHeader, fieldNameToConverterMap, sortByField,
isDescending, limit, headerOnly, rows, "");
+ }
+
+ /**
+ * Serialize Table to printable string and also export a temporary view to
easily write sql queries.
+ *
+ * Ideally, exporting view needs to be outside PrintHelper, but all commands
use this. So this is easy
+ * way to add support for all commands
+ *
+ * @param rowHeader Row Header
+ * @param fieldNameToConverterMap Field Specific Converters
+ * @param sortByField Sorting field
+ * @param isDescending Order
+ * @param limit Limit
+ * @param headerOnly Headers only
+ * @param rows List of rows
+ * @param tempTableName table name to export
+ * @return Serialized form for printing
+ */
+ public static String print(TableHeader rowHeader, Map<String,
Function<Object, String>> fieldNameToConverterMap,
+ String sortByField, boolean isDescending, Integer limit, boolean
headerOnly, List<Comparable[]> rows,
+ String tempTableName) {
if (headerOnly) {
return HoodiePrintHelper.print(rowHeader);
}
+ if (!StringUtils.isNullOrEmpty(tempTableName)) {
+ HoodieCLI.getTempViewProvider().createOrReplace(tempTableName,
rowHeader.getFieldNames(),
+ rows.stream().map(columns ->
Arrays.asList(columns)).collect(Collectors.toList()));
+ }
+
if (!sortByField.isEmpty() && !rowHeader.containsField(sortByField)) {
return String.format("Field[%s] is not in table, given columns[%s]",
sortByField, rowHeader.getFieldNames());
}
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
index 804096b..3c08305 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/CommitsCommand.java
@@ -33,7 +33,6 @@ import
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.NumericUtils;
-
import org.apache.hudi.common.util.StringUtils;
import org.apache.spark.launcher.SparkLauncher;
import org.springframework.shell.core.CommandMarker;
@@ -59,7 +58,8 @@ public class CommitsCommand implements CommandMarker {
private String printCommits(HoodieDefaultTimeline timeline,
final Integer limit, final String sortByField,
final boolean descending,
- final boolean headerOnly) throws IOException {
+ final boolean headerOnly,
+ final String tempTableName) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();
final List<HoodieInstant> commits =
timeline.getCommitsTimeline().filterCompletedInstants()
@@ -96,13 +96,16 @@ public class CommitsCommand implements CommandMarker {
.addTableHeaderField("Total Records Written")
.addTableHeaderField("Total Update Records Written")
.addTableHeaderField("Total Errors");
- return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
+
+ return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending,
+ limit, headerOnly, rows, tempTableName);
}
private String printCommitsWithMetadata(HoodieDefaultTimeline timeline,
final Integer limit, final String sortByField,
final boolean descending,
- final boolean headerOnly) throws IOException {
+ final boolean headerOnly,
+ final String tempTableName) throws IOException {
final List<Comparable[]> rows = new ArrayList<>();
final List<HoodieInstant> commits =
timeline.getCommitsTimeline().filterCompletedInstants()
@@ -144,13 +147,16 @@ public class CommitsCommand implements CommandMarker {
.addTableHeaderField("Total Rollback
Blocks").addTableHeaderField("Total Log Records")
.addTableHeaderField("Total Updated Records
Compacted").addTableHeaderField("Total Write Bytes");
- return HoodiePrintHelper.print(header, new HashMap<>(), sortByField,
descending, limit, headerOnly, rows);
+ return HoodiePrintHelper.print(header, new HashMap<>(), sortByField,
descending,
+ limit, headerOnly, rows, tempTableName);
}
@CliCommand(value = "commits show", help = "Show the commits")
public String showCommits(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra
metadata",
unspecifiedDefaultValue = "false") final boolean
includeExtraMetadata,
+ @CliOption(key = {"createView"}, mandatory = false, help = "view name to
store output table",
+ unspecifiedDefaultValue = "") final String exportTableName,
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
@@ -161,9 +167,9 @@ public class CommitsCommand implements CommandMarker {
HoodieActiveTimeline activeTimeline =
HoodieCLI.getTableMetaClient().getActiveTimeline();
if (includeExtraMetadata) {
- return printCommitsWithMetadata(activeTimeline, limit, sortByField,
descending, headerOnly);
+ return printCommitsWithMetadata(activeTimeline, limit, sortByField,
descending, headerOnly, exportTableName);
} else {
- return printCommits(activeTimeline, limit, sortByField, descending,
headerOnly);
+ return printCommits(activeTimeline, limit, sortByField, descending,
headerOnly, exportTableName);
}
}
@@ -171,6 +177,8 @@ public class CommitsCommand implements CommandMarker {
public String showArchivedCommits(
@CliOption(key = {"includeExtraMetadata"}, help = "Include extra
metadata",
unspecifiedDefaultValue = "false") final boolean
includeExtraMetadata,
+ @CliOption(key = {"createView"}, mandatory = false, help = "view
name to store output table",
+ unspecifiedDefaultValue = "") final String exportTableName,
@CliOption(key = {"startTs"}, mandatory = false, help = "start time
for commits, default: now - 10 days")
String startTs,
@CliOption(key = {"endTs"}, mandatory = false, help = "end time for
commits, default: now - 1 day")
@@ -195,9 +203,9 @@ public class CommitsCommand implements CommandMarker {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
HoodieDefaultTimeline timelineRange =
archivedTimeline.findInstantsInRange(startTs, endTs);
if (includeExtraMetadata) {
- return printCommitsWithMetadata(timelineRange, limit, sortByField,
descending, headerOnly);
+ return printCommitsWithMetadata(timelineRange, limit, sortByField,
descending, headerOnly, exportTableName);
} else {
- return printCommits(timelineRange, limit, sortByField, descending,
headerOnly);
+ return printCommits(timelineRange, limit, sortByField, descending,
headerOnly, exportTableName);
}
} finally {
// clear the instant details from memory after printing to reduce usage
@@ -237,7 +245,10 @@ public class CommitsCommand implements CommandMarker {
}
@CliCommand(value = "commit showpartitions", help = "Show partition level
details of a commit")
- public String showCommitPartitions(@CliOption(key = {"commit"}, help =
"Commit to show") final String commitTime,
+ public String showCommitPartitions(
+ @CliOption(key = {"createView"}, mandatory = false, help = "view name to
store output table",
+ unspecifiedDefaultValue = "") final String exportTableName,
+ @CliOption(key = {"commit"}, help = "Commit to show") final String
commitTime,
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue =
"false") final boolean descending,
@@ -287,11 +298,15 @@ public class CommitsCommand implements CommandMarker {
.addTableHeaderField("Total Records
Inserted").addTableHeaderField("Total Records Updated")
.addTableHeaderField("Total Bytes Written").addTableHeaderField("Total
Errors");
- return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending, limit, headerOnly, rows);
+ return HoodiePrintHelper.print(header, fieldNameToConverterMap,
sortByField, descending,
+ limit, headerOnly, rows, exportTableName);
}
@CliCommand(value = "commit showfiles", help = "Show file level details of a
commit")
- public String showCommitFiles(@CliOption(key = {"commit"}, help = "Commit to
show") final String commitTime,
+ public String showCommitFiles(
+ @CliOption(key = {"createView"}, mandatory = false, help = "view name to
store output table",
+ unspecifiedDefaultValue = "") final String exportTableName,
+ @CliOption(key = {"commit"}, help = "Commit to show") final String
commitTime,
@CliOption(key = {"limit"}, help = "Limit commits",
unspecifiedDefaultValue = "-1") final Integer limit,
@CliOption(key = {"sortBy"}, help = "Sorting Field",
unspecifiedDefaultValue = "") final String sortByField,
@CliOption(key = {"desc"}, help = "Ordering", unspecifiedDefaultValue =
"false") final boolean descending,
@@ -323,7 +338,8 @@ public class CommitsCommand implements CommandMarker {
.addTableHeaderField("Total Records
Written").addTableHeaderField("Total Bytes Written")
.addTableHeaderField("Total Errors").addTableHeaderField("File Size");
- return HoodiePrintHelper.print(header, new HashMap<>(), sortByField,
descending, limit, headerOnly, rows);
+ return HoodiePrintHelper.print(header, new HashMap<>(), sortByField,
descending,
+ limit, headerOnly, rows, exportTableName);
}
@CliCommand(value = "commits compare", help = "Compare commits with another
Hoodie table")
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java
new file mode 100644
index 0000000..39e3767
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TempViewCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hudi.cli.HoodieCLI;
+
+import org.springframework.shell.core.CommandMarker;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * CLI command to query/delete temp views.
+ */
+@Component
+public class TempViewCommand implements CommandMarker {
+
+ private static final String EMPTY_STRING = "";
+
+ @CliCommand(value = "temp_query", help = "query against created temp view")
+ public String query(
+ @CliOption(key = {"sql"}, mandatory = true, help = "select query to
run against view") final String sql)
+ throws IOException {
+
+ HoodieCLI.getTempViewProvider().runQuery(sql);
+ return EMPTY_STRING;
+ }
+
+ @CliCommand(value = "temp_delete", help = "Delete view name")
+ public String delete(
+ @CliOption(key = {"view"}, mandatory = true, help = "view name")
final String tableName)
+ throws IOException {
+
+ HoodieCLI.getTempViewProvider().deleteTable(tableName);
+ return EMPTY_STRING;
+ }
+}
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
new file mode 100644
index 0000000..68c18f9
--- /dev/null
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkTempViewProvider.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.utils;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SparkTempViewProvider implements TempViewProvider {
+ private static final Logger LOG =
LogManager.getLogger(SparkTempViewProvider.class);
+
+ private JavaSparkContext jsc;
+ private SQLContext sqlContext;
+
+ public SparkTempViewProvider(String appName) {
+ try {
+ SparkConf sparkConf = new SparkConf().setAppName(appName)
+ .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer").setMaster("local[8]");
+ jsc = new JavaSparkContext(sparkConf);
+ jsc.setLogLevel("ERROR");
+
+ sqlContext = new SQLContext(jsc);
+ } catch (Throwable ex) {
+ // log full stack trace and rethrow. Without this its difficult to debug
failures, if any
+ LOG.error("unable to initialize spark context ", ex);
+ throw new HoodieException(ex);
+ }
+ }
+
+ @Override
+ public void createOrReplace(String tableName, List<String> headers,
List<List<Comparable>> rows) {
+ try {
+ if (headers.isEmpty() || rows.isEmpty()) {
+ return;
+ }
+
+ if (rows.stream().filter(row -> row.size() != headers.size()).count() >
0) {
+ throw new HoodieException("Invalid row, does not match headers " +
headers.size() + " " + rows.size());
+ }
+
+ // replace all whitespaces in headers to make it easy to write sql
queries
+ List<String> headersNoSpaces = headers.stream().map(title ->
title.replaceAll("\\s+",""))
+ .collect(Collectors.toList());
+
+ // generate schema for table
+ StructType structType = new StructType();
+ for (int i = 0; i < headersNoSpaces.size(); i++) {
+ // try guessing data type from column data.
+ DataType headerDataType = getDataType(rows.get(0).get(i));
+ structType =
structType.add(DataTypes.createStructField(headersNoSpaces.get(i),
headerDataType, true));
+ }
+ List<Row> records = rows.stream().map(row ->
RowFactory.create(row.toArray(new Comparable[row.size()])))
+ .collect(Collectors.toList());
+ Dataset<Row> dataset = this.sqlContext.createDataFrame(records,
structType);
+ dataset.createOrReplaceTempView(tableName);
+ System.out.println("Wrote table view: " + tableName);
+ } catch (Throwable ex) {
+ // log full stack trace and rethrow. Without this its difficult to debug
failures, if any
+ LOG.error("unable to write ", ex);
+ throw new HoodieException(ex);
+ }
+ }
+
+ @Override
+ public void runQuery(String sqlText) {
+ try {
+ this.sqlContext.sql(sqlText).show(Integer.MAX_VALUE, false);
+ } catch (Throwable ex) {
+ // log full stack trace and rethrow. Without this its difficult to debug
failures, if any
+ LOG.error("unable to read ", ex);
+ throw new HoodieException(ex);
+ }
+ }
+
+ @Override
+ public void deleteTable(String tableName) {
+ try {
+ sqlContext.sql("DROP TABLE IF EXISTS " + tableName);
+ } catch (Throwable ex) {
+ // log full stack trace and rethrow. Without this its difficult to debug
failures, if any
+ LOG.error("unable to initialize spark context ", ex);
+ throw new HoodieException(ex);
+ }
+ }
+
+ private DataType getDataType(Comparable comparable) {
+ if (comparable instanceof Integer) {
+ return DataTypes.IntegerType;
+ }
+
+ if (comparable instanceof Double) {
+ return DataTypes.DoubleType;
+ }
+
+ if (comparable instanceof Long) {
+ return DataTypes.LongType;
+ }
+
+ if (comparable instanceof Boolean) {
+ return DataTypes.BooleanType;
+ }
+
+ // TODO add additional types when needed. default to string
+ return DataTypes.StringType;
+ }
+}
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java
new file mode 100644
index 0000000..1075fdd
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/TempViewProvider.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.utils;
+
+import java.util.List;
+
+public interface TempViewProvider {
+ void createOrReplace(String tableName, List<String> headers,
List<List<Comparable>> rows);
+
+ void runQuery(String sqlText);
+
+ void deleteTable(String tableName);
+}