JingsongLi commented on code in PR #540:
URL: https://github.com/apache/flink-table-store/pull/540#discussion_r1109369372
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java:
##########
@@ -61,4 +93,48 @@ public abstract class ActionBase implements Action {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Extract {@link LogicalType}s from Flink {@link
org.apache.flink.table.types.DataType}s and
+ * convert to Table Store {@link DataType}s.
+ */
+ protected List<DataType> toTableStoreDataTypes(
+ List<org.apache.flink.table.types.DataType> flinkDataTypes) {
+ return flinkDataTypes.stream()
+ .map(org.apache.flink.table.types.DataType::getLogicalType)
+ .map(LogicalTypeConversion::toDataType)
+ .collect(Collectors.toList());
+ }
+
+ /** Get {@link DataType}s from table's schema. */
+ protected List<DataType> getDataType() {
+ return ((FileStoreTable)
table).schema().logicalRowType().getFieldTypes();
+ }
+
+ /**
+ * Check whether each {@link DataType} of actualTypes can be implicitly
cast to that of
+ * expectedTypes respectively.
+ */
+ protected boolean checkSchema(List<DataType> actualTypes, List<DataType>
expectedTypes) {
+ if (actualTypes.size() != expectedTypes.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < actualTypes.size(); i++) {
+ if (!DataTypeCasts.supportsImplicitCast(actualTypes.get(i),
expectedTypes.get(i))) {
Review Comment:
Int -> Long will throw cast exception.
We should introduce a rule to `DataTypeCasts`, for example,
`dataStructureCastingRule`.
##########
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/action/InsertChangesAction.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.types.CharType;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.store.connector.action.Action.getTablePath;
+
+/**
+ * Insert changes from given query to a table. The run() method works in
following steps:
+ *
+ * <ul>
+ * <li>Get {@link Table} queriedTable from given query.
+ * <li>Transform the queriedTable to changelog {@link DataStream} with
internal {@link RowData}.
+ * The row kind is extracted from the first column, and the fields is
from left fields of the
+ * original row.
+ * <li>Build a sink from the data stream and the target table then execute.
+ * </ul>
+ */
+public class InsertChangesAction extends ActionBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(InsertChangesAction.class);
+
+ private final String query;
+
+ InsertChangesAction(String warehouse, String database, String tableName,
String query) {
+ super(warehouse, database, tableName);
+ if (((FileStoreTable) table).schema().primaryKeys().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "insert-changes action doesn't support table with no
primary keys defined.");
+ }
+ this.query = query;
+ }
+
+ public static Optional<Action> create(String[] args) {
+ LOG.info("insert-changes job args: {}", String.join(" ", args));
+
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+ if (params.has("help")) {
+ printHelp();
+ return Optional.empty();
+ }
+
+ Tuple3<String, String, String> tablePath = getTablePath(params);
+ if (tablePath == null) {
+ return Optional.empty();
+ }
+
+ String query = params.get("query");
+ if (query == null) {
+ return Optional.empty();
+ }
+
+ InsertChangesAction action =
+ new InsertChangesAction(tablePath.f0, tablePath.f1,
tablePath.f2, query);
+
+ return Optional.of(action);
+ }
+
+ private static void printHelp() {
+ System.out.println("Action \"insert-changes\" apply changes from given
query to a table.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " insert-changes --warehouse <warehouse-path> --database
<database-name> "
+ + "--table <table-name> --query <query-statement>");
+ System.out.println(" insert-changes --path <table-path> --query
<query-statement>");
+ System.out.println();
+
+ System.out.println("Note: ");
+ System.out.println(
+ " 1. the result's first column of the <query-statement> must
be RowKind:");
+ System.out.println(" +I: newly added row");
+ System.out.println(" +U: updated row with new content");
+ System.out.println(" -D: deleted row");
+ System.out.println(
+ " 2. make sure the schema of query result after discarding
the first column is the same as the <table-name>");
+
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " insert-changes --warehouse hdfs:///path/to/warehouse
--database test_db --table test_table --query SELECT '-D', k, v FROM
test_table");
+ System.out.println(
+ " It will insert all records from test_table with RowKind
'-D' to test_table");
+ }
+
+ @Override
+ public void run() throws Exception {
+ Table queriedTable = tEnv.sqlQuery(query);
+
+ List<DataType> actualTypes =
+
toTableStoreDataTypes(queriedTable.getResolvedSchema().getColumnDataTypes());
+
+ List<DataType> expectedTypes = new ArrayList<>(getDataType());
+ expectedTypes.add(0, new CharType(false, 2));
+
+ // the row kind column must be exactly CHAR(2)
+ if (!checkSchema(actualTypes, expectedTypes)
+ || ((CharType) actualTypes.get(0)).getLength() != 2) {
Review Comment:
I think we can remove this check: `((CharType)
actualTypes.get(0)).getLength() != 2`, we can accept string too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]