This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7080cfae [FLINK-31087] Introduce merge-into action
7080cfae is described below
commit 7080cfae9c9cce9db54b75e607a3d43cefc19ca7
Author: yuzelin <[email protected]>
AuthorDate: Fri Mar 3 13:58:32 2023 +0800
[FLINK-31087] Introduce merge-into action
This closes #540
---
docs/content/docs/how-to/writing-tables.md | 118 ++++
.../flink/table/store/types/DataTypeCasts.java | 56 +-
.../flink/table/store/file/catalog/Identifier.java | 4 +
.../table/store/tests/FlinkActionsE2eTest.java | 109 ++++
.../flink/table/store/connector/action/Action.java | 37 +-
.../table/store/connector/action/ActionBase.java | 71 +++
.../table/store/connector/action/DeleteAction.java | 22 +-
.../store/connector/action/MergeIntoAction.java | 677 +++++++++++++++++++++
...nITCase.java => DropPartitionActionITCase.java} | 2 +-
.../connector/action/MergeIntoActionITCase.java | 383 ++++++++++++
.../connector/util/ReadWriteTableTestUtil.java | 2 +-
11 files changed, 1444 insertions(+), 37 deletions(-)
diff --git a/docs/content/docs/how-to/writing-tables.md
b/docs/content/docs/how-to/writing-tables.md
index e77c47a1..2d3d0deb 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -268,3 +268,121 @@ For more information of 'delete', see
{{< /tab >}}
{{< /tabs >}}
+
+## Merging into table
+
+Table Store supports "MERGE INTO" via submitting the 'merge-into' job through
`flink run`.
+
+{{< hint info >}}
+Important table properties setting:
+1. Only [primary key table]({{< ref "docs/concepts/primary-key-table" >}})
supports this feature.
+2. The action won't produce UPDATE_BEFORE, so it's not recommended to set
'changelog-producer' = 'input'.
+{{< /hint >}}
+
+The design referenced such syntax:
+```sql
+MERGE INTO target-table
+ USING source-table | source-expr AS source-alias
+ ON merge-condition
+ WHEN MATCHED [AND matched-condition]
+ THEN UPDATE SET xxx
+ WHEN MATCHED [AND matched-condition]
+ THEN DELETE
+ WHEN NOT MATCHED [AND not-matched-condition]
+ THEN INSERT VALUES (xxx)
+ WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ THEN UPDATE SET xxx
+ WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ THEN DELETE
+```
+The merge-into action use "upsert" semantics instead of "update", which means
if the row exists,
+then do update, else do insert. For example, for non-primary-key table, you
can update every column,
+but for primary key table, if you want to update primary keys, you have to
insert a new row which has
+different primary keys from rows in the table. In this scenario, "upsert" is
useful.
+
+{{< tabs "merge-into" >}}
+
+{{< tab "Flink Job" >}}
+
+Run the following command to submit a 'merge-into' job for the table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+ merge-into \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table <target-table> \
+ --using-table <source-table> \
+ --on <merge-condition> \
+ --merge-actions
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>
\
+ --matched-upsert-condition <matched-condition> \
+ --matched-upsert-set <upsert-changes> \
+ --matched-delete-condition <matched-condition> \
+ --not-matched-insert-condition <not-matched-condition> \
+ --not-matched-insert-values <insert-values> \
+ --not-matched-by-source-upsert-condition <not-matched-by-source-condition>
\
+ --not-matched-by-source-upsert-set <not-matched-upsert-changes> \
+ --not-matched-by-source-delete-condition <not-matched-by-source-condition>
+
+-- Examples:
+-- Find all orders mentioned in the source table, then mark as important if
the price is above 100
+-- or delete if the price is under 10.
+./flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+ merge-into \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table T \
+ --using-table S \
+ --on "T.id = S.order_id" \
+ --merge-actions \
+ matched-upsert,matched-delete \
+ --matched-upsert-condition "T.price > 100" \
+ --matched-upsert-set "id = T.id, price = T.price, mark = 'important'" \
+ --matched-delete-condition "T.price < 10"
+
+-- For matched order rows, increase the price, and if there is no match,
insert the order from the
+-- source table:
+./flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+ merge-into \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table T \
+ --using-table S \
+ --on "T.id = S.order_id" \
+ --merge-actions \
+ matched-upsert,not-matched-insert \
+ --matched-upsert-set "id = T.id, price = T.price + 20, mark = T.mark" \
+ --not-matched-insert-values *
+
+-- For not matched by source order rows (which are in the target table and
does not match any row in the
+-- source table based on the merge-condition), decrease the price or if the
mark is 'trivial', delete them:
+./flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+ merge-into \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table T \
+ --using-table S \
+ --on "T.id = S.order_id" \
+ --merge-actions \
+ not-matched-by-source-upsert,not-matched-by-source-delete \
+ --not-matched-by-source-upsert-condition "T.mark <> 'trivial'" \
+ --not-matched-by-source-upsert-set "id = T.id, price = T.price - 20, mark
= T.mark" \
+ --not-matched-by-source-delete-condition "T.mark = 'trivial'"
+```
+
+For more information of 'merge-into', see
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+ merge-into --help
+```
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java
index 72260d2d..637a6dc1 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/types/DataTypeCasts.java
@@ -57,9 +57,12 @@ public final class DataTypeCasts {
private static final Map<DataTypeRoot, Set<DataTypeRoot>>
explicitCastingRules;
+ private static final Map<DataTypeRoot, Set<DataTypeRoot>>
compatibleCastingRules;
+
static {
implicitCastingRules = new HashMap<>();
explicitCastingRules = new HashMap<>();
+ compatibleCastingRules = new HashMap<>();
// identity casts
@@ -69,28 +72,36 @@ public final class DataTypeCasts {
// cast specification
- castTo(CHAR).implicitFrom(CHAR).explicitFromFamily(PREDEFINED,
CONSTRUCTED).build();
+ castTo(CHAR)
+ .implicitFrom(CHAR)
+ .explicitFromFamily(PREDEFINED, CONSTRUCTED)
+ .compatibleFrom(CHAR, VARCHAR)
+ .build();
castTo(VARCHAR)
.implicitFromFamily(CHARACTER_STRING)
.explicitFromFamily(PREDEFINED, CONSTRUCTED)
+ .compatibleFrom(CHAR, VARCHAR)
.build();
castTo(BOOLEAN)
.implicitFrom(BOOLEAN)
.explicitFromFamily(CHARACTER_STRING, INTEGER_NUMERIC)
+ .compatibleFrom(BOOLEAN)
.build();
castTo(BINARY)
.implicitFrom(BINARY)
.explicitFromFamily(CHARACTER_STRING)
.explicitFrom(VARBINARY)
+ .compatibleFrom(BINARY, VARBINARY)
.build();
castTo(VARBINARY)
.implicitFromFamily(BINARY_STRING)
.explicitFromFamily(CHARACTER_STRING)
.explicitFrom(BINARY)
+ .compatibleFrom(BINARY, VARBINARY)
.build();
castTo(DECIMAL)
@@ -103,61 +114,71 @@ public final class DataTypeCasts {
.implicitFrom(TINYINT)
.explicitFromFamily(NUMERIC, CHARACTER_STRING)
.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ .compatibleFrom(TINYINT)
.build();
castTo(SMALLINT)
.implicitFrom(TINYINT, SMALLINT)
.explicitFromFamily(NUMERIC, CHARACTER_STRING)
.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ .compatibleFrom(SMALLINT)
.build();
castTo(INTEGER)
.implicitFrom(TINYINT, SMALLINT, INTEGER)
.explicitFromFamily(NUMERIC, CHARACTER_STRING)
.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE)
.build();
castTo(BIGINT)
.implicitFrom(TINYINT, SMALLINT, INTEGER, BIGINT)
.explicitFromFamily(NUMERIC, CHARACTER_STRING)
.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ .compatibleFrom(BIGINT)
.build();
castTo(FLOAT)
.implicitFrom(TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT,
DECIMAL)
.explicitFromFamily(NUMERIC, CHARACTER_STRING)
.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ .compatibleFrom(FLOAT)
.build();
castTo(DOUBLE)
.implicitFromFamily(NUMERIC)
.explicitFromFamily(CHARACTER_STRING)
.explicitFrom(BOOLEAN, TIMESTAMP_WITHOUT_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
+ .compatibleFrom(DOUBLE)
.build();
castTo(DATE)
.implicitFrom(DATE, TIMESTAMP_WITHOUT_TIME_ZONE)
.explicitFromFamily(TIMESTAMP, CHARACTER_STRING)
+ .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE)
.build();
castTo(TIME_WITHOUT_TIME_ZONE)
.implicitFrom(TIME_WITHOUT_TIME_ZONE,
TIMESTAMP_WITHOUT_TIME_ZONE)
.explicitFromFamily(TIME, TIMESTAMP, CHARACTER_STRING)
+ .compatibleFrom(INTEGER, DATE, TIME_WITHOUT_TIME_ZONE)
.build();
castTo(TIMESTAMP_WITHOUT_TIME_ZONE)
.implicitFrom(TIMESTAMP_WITHOUT_TIME_ZONE,
TIMESTAMP_WITH_LOCAL_TIME_ZONE)
.explicitFromFamily(DATETIME, CHARACTER_STRING, NUMERIC)
+ .compatibleFrom(TIMESTAMP_WITHOUT_TIME_ZONE)
.build();
castTo(TIMESTAMP_WITH_LOCAL_TIME_ZONE)
.implicitFrom(TIMESTAMP_WITH_LOCAL_TIME_ZONE,
TIMESTAMP_WITHOUT_TIME_ZONE)
.explicitFromFamily(DATETIME, CHARACTER_STRING, NUMERIC)
+ .compatibleFrom(TIMESTAMP_WITH_LOCAL_TIME_ZONE)
.build();
}
/**
- * Returns whether the source type can be safely casted to the target type
without loosing
+ * Returns whether the source type can be safely cast to the target type
without loosing
* information.
*
* <p>Implicit casts are used for type widening and type generalization
(finding a common
@@ -169,7 +190,7 @@ public final class DataTypeCasts {
}
/**
- * Returns whether the source type can be casted to the target type.
+ * Returns whether the source type can be cast to the target type.
*
* <p>Explicit casts correspond to the SQL cast specification and
represent the logic behind a
* {@code CAST(sourceType AS targetType)} operation. For example, it
allows for converting most
@@ -180,6 +201,28 @@ public final class DataTypeCasts {
return supportsCasting(sourceType, targetType, true);
}
+ /**
+ * Returns whether the source type can be compatibly cast to the target
type.
+ *
+ * <p>If two types are compatible, they should have the same underlying
data structure. For
+ * example, {@link CharType} and {@link VarCharType} are both in the {@link
+ * DataTypeFamily#CHARACTER_STRING} family, meaning they both represent a
character string. But
+ * the rest types are only compatible with themselves. For example,
although {@link IntType} and
+ * {@link BigIntType} are both in the {@link DataTypeFamily#NUMERIC}
family, they are not
+ * compatible because IntType represents a 4-byte signed integer while
BigIntType represents an
+ * 8-byte signed integer. Especially, two {@link DecimalType}s are
compatible only when they
+ * have the same {@code precision} and {@code scale}.
+ */
+ public static boolean supportsCompatibleCast(DataType sourceType, DataType
targetType) {
+ if (sourceType.copy(true).equals(targetType.copy(true))) {
+ return true;
+ }
+
+ return compatibleCastingRules
+ .get(targetType.getTypeRoot())
+ .contains(sourceType.getTypeRoot());
+ }
+
//
--------------------------------------------------------------------------------------------
private static boolean supportsCasting(
@@ -219,6 +262,7 @@ public final class DataTypeCasts {
private final DataTypeRoot targetType;
private final Set<DataTypeRoot> implicitSourceTypes = new HashSet<>();
private final Set<DataTypeRoot> explicitSourceTypes = new HashSet<>();
+ private final Set<DataTypeRoot> compatibleSourceTypes = new
HashSet<>();
CastingRuleBuilder(DataTypeRoot targetType) {
this.targetType = targetType;
@@ -256,6 +300,11 @@ public final class DataTypeCasts {
return this;
}
+ CastingRuleBuilder compatibleFrom(DataTypeRoot... sourceTypes) {
+ this.compatibleSourceTypes.addAll(Arrays.asList(sourceTypes));
+ return this;
+ }
+
/**
* Should be called after {@link
#explicitFromFamily(DataTypeFamily...)} to remove
* previously added types.
@@ -274,6 +323,7 @@ public final class DataTypeCasts {
void build() {
implicitCastingRules.put(targetType, implicitSourceTypes);
explicitCastingRules.put(targetType, explicitSourceTypes);
+ compatibleCastingRules.put(targetType, compatibleSourceTypes);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
index 397172ad..b7b37be8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Identifier.java
@@ -58,6 +58,10 @@ public class Identifier implements Serializable {
return String.format("%s.%s", database, table);
}
+ public String getEscapedFullName() {
+ return getEscapedFullName('`');
+ }
+
public String getEscapedFullName(char escapeChar) {
return String.format(
"%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar,
table, escapeChar);
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index 69ca1ac0..a5a0376f 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -256,6 +256,115 @@ public class FlinkActionsE2eTest extends E2eTestBase {
"2023-01-21, 1, 31");
}
+ @Test
+ public void testMergeInto() throws Exception {
+ String tableTDdl =
+ "CREATE TABLE IF NOT EXISTS T (\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " last_action STRING,\n"
+ + " dt STRING,\n"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt);";
+
+ String insertToT =
+ "INSERT INTO T VALUES"
+ + "(1, 'v_1', 'creation', '02-27'),"
+ + "(2, 'v_2', 'creation', '02-27'),"
+ + "(3, 'v_3', 'creation', '02-27'),"
+ + "(4, 'v_4', 'creation', '02-27'),"
+ + "(5, 'v_5', 'creation', '02-28'),"
+ + "(6, 'v_6', 'creation', '02-28'),"
+ + "(7, 'v_7', 'creation', '02-28'),"
+ + "(8, 'v_8', 'creation', '02-28'),"
+ + "(9, 'v_9', 'creation', '02-28'),"
+ + "(10, 'v_10', 'creation', '02-28');\n";
+
+ String tableSDdl =
+ "CREATE TABLE IF NOT EXISTS S (\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " dt STRING,\n"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt);";
+
+ String insertToS =
+ "INSERT INTO S VALUES"
+ + "(1, 'v_1', '02-27'),"
+ + "(4, CAST (NULL AS STRING), '02-27'),"
+ + "(7, 'Seven', '02-28'),"
+ + "(8, CAST (NULL AS STRING), '02-28'),"
+ + "(8, 'v_8', '02-29'),"
+ + "(11, 'v_11', '02-29'),"
+ + "(12, 'v_12', '02-29');\n";
+
+ runSql(
+ "SET 'table.dml-sync' = 'true';\n" + insertToT + insertToS,
+ catalogDdl,
+ useCatalogCmd,
+ tableTDdl,
+ tableSDdl);
+
+ // run merge-into job
+ Container.ExecResult execResult =
+ jobManager.execInContainer(
+ "bin/flink",
+ "run",
+ "-p",
+ "1",
+ "-c",
+
"org.apache.flink.table.store.connector.action.FlinkActions",
+ "--detached",
+ "lib/flink-table-store.jar",
+ "merge-into",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--table",
+ "T",
+ "--using-table",
+ "S",
+ "--on",
+ "T.k=S.k AND T.dt=S.dt",
+ "--merge-actions",
+ "matched-upsert,matched-delete,not-matched-insert",
+ "--matched-upsert-condition",
+ "T.v <> S.v AND S.v IS NOT NULL",
+ "--matched-upsert-set",
+ "v = S.v, last_action = 'matched_upsert'",
+ "--matched-delete-condition",
+ "S.v IS NULL",
+ "--not-matched-insert-condition",
+ "S.k < 12",
+ "--not-matched-insert-values",
+ "S.k, S.v, 'insert', S.dt");
+
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ // read all data from table store
+ runSql(
+ "INSERT INTO result1 SELECT * FROM T;",
+ catalogDdl,
+ useCatalogCmd,
+ tableTDdl,
+ createResultSink("result1", "k INT, v STRING, last_action
STRING, dt STRING"));
+
+ // check the left data
+ checkResult(
+ "1, v_1, creation, 02-27",
+ "2, v_2, creation, 02-27",
+ "3, v_3, creation, 02-27",
+ "5, v_5, creation, 02-28",
+ "6, v_6, creation, 02-28",
+ "7, Seven, matched_upsert, 02-28",
+ "8, v_8, insert, 02-29",
+ "9, v_9, creation, 02-28",
+ "10, v_10, creation, 02-28",
+ "11, v_11, insert, 02-29");
+ }
+
private void runSql(String sql, String... ddls) throws Exception {
runSql(String.join("\n", ddls) + "\n" + sql);
}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java
index ab758018..cf4ed56c 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/Action.java
@@ -79,18 +79,9 @@ public interface Action {
static List<Map<String, String>> getPartitions(MultipleParameterTool
params) {
List<Map<String, String>> partitions = new ArrayList<>();
for (String partition : params.getMultiParameter("partition")) {
- Map<String, String> kvs = new HashMap<>();
- for (String kvString : partition.split(",")) {
- String[] kv = kvString.split("=");
- if (kv.length != 2) {
- System.err.print(
- "Invalid key-value pair \""
- + kvString
- + "\".\n"
- + "Run <action> --help for help.");
- return null;
- }
- kvs.put(kv[0], kv[1]);
+ Map<String, String> kvs = parseKeyValues(partition);
+ if (kvs == null) {
+ return null;
}
partitions.add(kvs);
}
@@ -98,6 +89,24 @@ public interface Action {
return partitions;
}
+ static Map<String, String> parseKeyValues(String keyValues) {
+ Map<String, String> kvs = new HashMap<>();
+ for (String kvString : keyValues.split(",")) {
+ String[] kv = kvString.split("=");
+ if (kv.length != 2) {
+ System.err.print(
+ "Invalid key-value pair \""
+ + kvString
+ + "\".\n"
+ + "Run <action> --help for help.");
+ return null;
+ }
+ kvs.put(kv[0].trim(), kv[1].trim());
+ }
+
+ return kvs;
+ }
+
/** Factory to create {@link Action}. */
class Factory {
@@ -105,6 +114,7 @@ public interface Action {
private static final String COMPACT = "compact";
private static final String DROP_PARTITION = "drop-partition";
private static final String DELETE = "delete";
+ private static final String MERGE_INTO = "merge-into";
public static Optional<Action> create(String[] args) {
String action = args[0].toLowerCase();
@@ -117,6 +127,8 @@ public interface Action {
return DropPartitionAction.create(actionArgs);
case DELETE:
return DeleteAction.create(actionArgs);
+ case MERGE_INTO:
+ return MergeIntoAction.create(actionArgs);
default:
System.err.println("Unknown action \"" + action + "\"");
printHelp();
@@ -132,6 +144,7 @@ public interface Action {
System.out.println(" " + COMPACT);
System.out.println(" " + DROP_PARTITION);
System.out.println(" " + DELETE);
+ System.out.println(" " + MERGE_INTO);
System.out.println("For detailed options of each action, run
<action> --help");
}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
index 3cd36fc0..2e9014c8 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
@@ -18,17 +18,35 @@
package org.apache.flink.table.store.connector.action;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.catalog.CatalogContext;
+import org.apache.flink.table.store.connector.FlinkCatalog;
+import org.apache.flink.table.store.connector.LogicalTypeConversion;
+import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
import org.apache.flink.table.store.file.catalog.Identifier;
+import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.options.CatalogOptions;
import org.apache.flink.table.store.options.Options;
+import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.Table;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.store.types.DataTypeCasts;
+import org.apache.flink.table.types.logical.LogicalType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE;
+
/** Abstract base of {@link Action}. */
public abstract class ActionBase implements Action {
@@ -36,6 +54,12 @@ public abstract class ActionBase implements Action {
protected Catalog catalog;
+ protected final FlinkCatalog flinkCatalog;
+
+ protected StreamExecutionEnvironment env;
+
+ protected StreamTableEnvironment tEnv;
+
protected Identifier identifier;
protected Table table;
@@ -50,6 +74,14 @@ public abstract class ActionBase implements Action {
CatalogFactory.createCatalog(
CatalogContext.create(
new Options().set(CatalogOptions.WAREHOUSE,
warehouse)));
+ flinkCatalog = new FlinkCatalog(catalog, "table-store",
DEFAULT_DATABASE);
+
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+
+ // register flink catalog to table environment
+ tEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog);
+ tEnv.useCatalog(flinkCatalog.getName());
try {
table = catalog.getTable(identifier);
@@ -62,4 +94,43 @@ public abstract class ActionBase implements Action {
throw new RuntimeException(e);
}
}
+
+ /**
+ * Extract {@link LogicalType}s from Flink {@link
org.apache.flink.table.types.DataType}s and
+ * convert to Table Store {@link DataType}s.
+ */
+ protected List<DataType> toTableStoreDataTypes(
+ List<org.apache.flink.table.types.DataType> flinkDataTypes) {
+ return flinkDataTypes.stream()
+ .map(org.apache.flink.table.types.DataType::getLogicalType)
+ .map(LogicalTypeConversion::toDataType)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Check whether each {@link DataType} of actualTypes is compatible with
that of expectedTypes
+ * respectively.
+ */
+ protected boolean compatibleCheck(List<DataType> actualTypes,
List<DataType> expectedTypes) {
+ if (actualTypes.size() != expectedTypes.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < actualTypes.size(); i++) {
+ if (!DataTypeCasts.supportsCompatibleCast(actualTypes.get(i),
expectedTypes.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /** Sink {@link DataStream} dataStream to table. */
+ protected void sink(DataStream<RowData> dataStream) throws Exception {
+ new FlinkSinkBuilder((FileStoreTable) table)
+ .withInput(dataStream)
+
.withLockFactory(Lock.factory(catalog.lockFactory().orElse(null), identifier))
+ .build();
+ env.execute();
+ }
}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java
index 8f616766..6d18d52d 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/DeleteAction.java
@@ -21,17 +21,11 @@ package org.apache.flink.table.store.connector.action;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
-import org.apache.flink.table.store.connector.FlinkCatalog;
-import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
-import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
@@ -42,21 +36,17 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static
org.apache.flink.table.store.connector.action.Action.getTablePath;
-import static
org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE;
/** Delete from table action for Flink. */
public class DeleteAction extends ActionBase {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteAction.class);
- private final FlinkCatalog flinkCatalog;
-
private final String filter;
public DeleteAction(String warehouse, String databaseName, String
tableName, String filter) {
super(warehouse, databaseName, tableName);
this.filter = filter;
- flinkCatalog = new FlinkCatalog(catalog, "table-store",
DEFAULT_DATABASE);
}
public static Optional<Action> create(String[] args) {
@@ -111,18 +101,11 @@ public class DeleteAction extends ActionBase {
public void run() throws Exception {
LOG.debug("Run delete action with filter '{}'.", filter);
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tEnv =
- StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
-
- tEnv.registerCatalog(flinkCatalog.getName(), flinkCatalog);
- tEnv.useCatalog(flinkCatalog.getName());
-
Table queriedTable =
tEnv.sqlQuery(
String.format(
"SELECT * FROM %s WHERE %s",
- identifier.getEscapedFullName('`'), filter));
+ identifier.getEscapedFullName(), filter));
List<DataStructureConverter<Object, Object>> converters =
queriedTable.getResolvedSchema().getColumnDataTypes().stream()
@@ -146,7 +129,6 @@ public class DeleteAction extends ActionBase {
return rowData;
});
- new FlinkSinkBuilder((FileStoreTable)
table).withInput(dataStream).build();
- env.execute();
+ sink(dataStream);
}
}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
new file mode 100644
index 00000000..e131087d
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.store.connector.LogicalTypeConversion;
+import org.apache.flink.table.store.file.catalog.Identifier;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.types.DataField;
+import org.apache.flink.table.store.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.store.connector.action.Action.getTablePath;
+import static
org.apache.flink.table.store.connector.action.Action.parseKeyValues;
+
+/**
+ * Flink action for 'MERGE INTO', which references the syntax as follows (we
use 'upsert' semantics
+ * instead of 'update'):
+ *
+ * <pre><code>
+ * MERGE INTO target-table
+ * USING source-table | source-expr AS source-alias
+ * ON merge-condition
+ * WHEN MATCHED [AND matched-condition]
+ * THEN UPDATE SET xxx
+ * WHEN MATCHED [AND matched-condition]
+ * THEN DELETE
+ * WHEN NOT MATCHED [AND not-matched-condition]
+ * THEN INSERT VALUES (xxx)
+ * WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ * THEN UPDATE SET xxx
+ * WHEN NOT MATCHED BY SOURCE [AND not-matched-by-source-condition]
+ * THEN DELETE
+ * </code></pre>
+ *
+ * <p>It builds a query to find the rows to be changed. INNER JOIN with
merge-condition is used to
+ * find MATCHED rows, and NOT EXISTS with merge-condition is used to find NOT
MATCHED rows, then the
+ * condition of each action is used to filter the rows.
+ */
+public class MergeIntoAction extends ActionBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MergeIntoAction.class);
+
+ // primary keys of target table
+ private final List<String> primaryKeys;
+
+ // converters for Row to RowData
+ private final List<DataStructureConverter<Object, Object>> converters;
+
+ // field names of target table
+ private final List<String> targetFieldNames;
+
+ // source
+ @Nullable private String sourceTable;
+ private Identifier sourceTableIdentifier;
+ @Nullable private String source;
+ @Nullable private String sourceAlias;
+
+ // merge condition
+ private String mergeCondition;
+
+ // actions to be taken
+ private boolean matchedUpsert;
+ private boolean notMatchedUpsert;
+ private boolean matchedDelete;
+ private boolean notMatchedDelete;
+ private boolean insert;
+
+ // upsert
+ @Nullable private String matchedUpsertCondition;
+ @Nullable private String matchedUpsertSet;
+
+ @Nullable private String notMatchedBySourceUpsertCondition;
+ @Nullable private String notMatchedBySourceUpsertSet;
+
+ // delete
+ @Nullable private String matchedDeleteCondition;
+ @Nullable private String notMatchedBySourceDeleteCondition;
+
+ // insert
+ @Nullable private String notMatchedInsertCondition;
+ @Nullable private String notMatchedInsertValues;
+
+ MergeIntoAction(String warehouse, String database, String tableName) {
+ super(warehouse, database, tableName);
+
+ if (!(table instanceof FileStoreTable)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Only FileStoreTable supports merge-into action.
The table type is '%s'.",
+ table.getClass().getName()));
+ }
+
+ // init primaryKeys of target table
+ primaryKeys = ((FileStoreTable) table).schema().primaryKeys();
+ if (primaryKeys.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "merge-into action doesn't support table with no primary
keys defined.");
+ }
+
+ // init DataStructureConverters
+ converters =
+ table.rowType().getFieldTypes().stream()
+ .map(LogicalTypeConversion::toLogicalType)
+ .map(TypeConversions::fromLogicalToDataType)
+ .map(DataStructureConverters::getConverter)
+ .collect(Collectors.toList());
+
+ // init field names of target table
+ targetFieldNames =
+ table.rowType().getFields().stream()
+ .map(DataField::name)
+ .collect(Collectors.toList());
+ }
+
+ public MergeIntoAction withSourceTable(String sourceTable) {
+ this.sourceTable = sourceTable;
+ return this;
+ }
+
+ public MergeIntoAction withSource(String source, String sourceAlias) {
+ this.source = source;
+ this.sourceAlias = sourceAlias;
+ return this;
+ }
+
+ public MergeIntoAction withMergeCondition(String mergeCondition) {
+ this.mergeCondition = mergeCondition;
+ return this;
+ }
+
+ public MergeIntoAction withMatchedUpsert(
+ @Nullable String matchedUpsertCondition, String matchedUpsertSet) {
+ this.matchedUpsert = true;
+ this.matchedUpsertCondition = matchedUpsertCondition;
+ this.matchedUpsertSet = matchedUpsertSet;
+ return this;
+ }
+
+ public MergeIntoAction withNotMatchedBySourceUpsert(
+ @Nullable String notMatchedBySourceUpsertCondition,
+ String notMatchedBySourceUpsertSet) {
+ this.notMatchedUpsert = true;
+ this.notMatchedBySourceUpsertCondition =
notMatchedBySourceUpsertCondition;
+ this.notMatchedBySourceUpsertSet = notMatchedBySourceUpsertSet;
+ return this;
+ }
+
+ public MergeIntoAction withMatchedDelete(@Nullable String
matchedDeleteCondition) {
+ this.matchedDelete = true;
+ this.matchedDeleteCondition = matchedDeleteCondition;
+ return this;
+ }
+
+ public MergeIntoAction withNotMatchedBySourceDelete(
+ @Nullable String notMatchedBySourceDeleteCondition) {
+ this.notMatchedDelete = true;
+ this.notMatchedBySourceDeleteCondition =
notMatchedBySourceDeleteCondition;
+ return this;
+ }
+
+ public MergeIntoAction withNotMatchedInsert(
+ @Nullable String notMatchedInsertCondition, String
notMatchedInsertValues) {
+ this.insert = true;
+ this.notMatchedInsertCondition = notMatchedInsertCondition;
+ this.notMatchedInsertValues = notMatchedInsertValues;
+ return this;
+ }
+
+ public static Optional<Action> create(String[] args) {
+ LOG.info("merge-into job args: {}", String.join(" ", args));
+
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+ if (params.has("help")) {
+ printHelp();
+ return Optional.empty();
+ }
+
+ Tuple3<String, String, String> tablePath = getTablePath(params);
+ if (tablePath == null) {
+ return Optional.empty();
+ }
+
+ MergeIntoAction action = new MergeIntoAction(tablePath.f0,
tablePath.f1, tablePath.f2);
+
+ if (!initSource(params, action)) {
+ return Optional.empty();
+ }
+
+ if (argumentAbsent(params, "on")) {
+ return Optional.empty();
+ }
+ action.withMergeCondition(params.get("on"));
+
+ List<String> actions =
+ Arrays.stream(params.get("merge-actions").split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ if (actions.contains("matched-upsert")) {
+ if (argumentAbsent(params, "matched-upsert-set")) {
+ return Optional.empty();
+ }
+ action.withMatchedUpsert(
+ params.get("matched-upsert-condition"),
params.get("matched-upsert-set"));
+ }
+ if (actions.contains("not-matched-by-source-upsert")) {
+ if (argumentAbsent(params, "not-matched-by-source-upsert-set")) {
+ return Optional.empty();
+ }
+ action.withNotMatchedBySourceUpsert(
+ params.get("not-matched-by-source-upsert-condition"),
+ params.get("not-matched-by-source-upsert-set"));
+ }
+ if (actions.contains("matched-delete")) {
+ action.withMatchedDelete(params.get("matched-delete-condition"));
+ }
+ if (actions.contains("not-matched-by-source-delete")) {
+ action.withNotMatchedBySourceDelete(
+ params.get("not-matched-by-source-delete-condition"));
+ }
+ if (actions.contains("not-matched-insert")) {
+ if (argumentAbsent(params, "not-matched-insert-values")) {
+ return Optional.empty();
+ }
+ action.withNotMatchedInsert(
+ params.get("not-matched-insert-condition"),
+ params.get("not-matched-insert-values"));
+ }
+
+ if (!validate(action)) {
+ return Optional.empty();
+ }
+
+ return Optional.of(action);
+ }
+
+ private static boolean initSource(MultipleParameterTool params,
MergeIntoAction action) {
+ String sourceTable = params.get("using-table");
+ String source = params.get("using-source");
+ String sourceAlias = params.get("as");
+
+ int count = 0;
+ if (sourceTable != null) {
+ action.withSourceTable(params.get("using-table"));
+ count++;
+ }
+
+ if (source != null) {
+ if (sourceAlias == null) {
+ System.err.println(
+ "The source and its alias must be specified
together.\n"
+ + "Run <action> --help for help.");
+ return false;
+ }
+ action.withSource(source, sourceAlias);
+ count++;
+ }
+
+ if (count != 1) {
+ System.err.println(
+ "Please specify either \"source table\" or \"source,
source's alias\".\n"
+ + "Run <action> --help for help.");
+ return false;
+ }
+
+ return true;
+ }
+
+ private static boolean argumentAbsent(MultipleParameterTool params, String
key) {
+ if (!params.has(key)) {
+ System.err.println(key + " is absent.\nRun <action> --help for
help.");
+ return true;
+ }
+
+ return false;
+ }
+
+ private static boolean validate(MergeIntoAction action) {
+ if (!action.matchedUpsert
+ && !action.notMatchedUpsert
+ && !action.matchedDelete
+ && !action.notMatchedDelete
+ && !action.insert) {
+ System.err.println(
+ "Must specify at least one merge action.\nRun <action>
--help for help.");
+ return false;
+ }
+
+ if ((action.matchedUpsert && action.matchedDelete)
+ && (action.matchedUpsertCondition == null
+ || action.matchedDeleteCondition == null)) {
+ System.err.println(
+ "If both matched-upsert and matched-delete actions are
present, their conditions must both be present too.\n"
+ + "Run <action> --help for help.");
+ return false;
+ }
+
+ if ((action.notMatchedUpsert && action.notMatchedDelete)
+ && (action.notMatchedBySourceUpsertCondition == null
+ || action.notMatchedBySourceDeleteCondition == null)) {
+ System.err.println(
+ "If both not-matched-by-source-upsert and
not-matched-by--source-delete actions are present, their conditions must both
be present too.\n"
+ + "Run <action> --help for help.");
+ return false;
+ }
+
+ if (action.notMatchedBySourceUpsertSet != null
+ && action.notMatchedBySourceUpsertSet.equals("*")) {
+ System.err.println("The '*' cannot be used in
not-matched-by-source-upsert-set");
+ return false;
+ }
+
+ return true;
+ }
+
+ private static void printHelp() {
+ System.out.println("Action \"merge-into\" simulates the \"MERGE INTO\"
syntax.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " merge-into --warehouse <warehouse-path>\n"
+ + " --database <database-name>\n"
+ + " --table <table-name>\n"
+ + " --using-table <source-table>\n"
+ + " --on <merge-condition>\n"
+ + " --merge-actions
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>\n"
+ + " --matched-upsert-condition
<matched-condition>\n"
+ + " --matched-upsert-set
<upsert-changes>\n"
+ + " --matched-delete-condition
<matched-condition>\n"
+ + " --not-matched-insert-condition
<not-matched-condition>\n"
+ + " --not-matched-insert-values
<insert-values>\n"
+ + "
--not-matched-by-source-upsert-condition <not-matched-by-source-condition>\n"
+ + " --not-matched-by-source-upsert-set
<not-matched-upsert-changes>\n"
+ + "
--not-matched-by-source-delete-condition <not-matched-by-source-condition>");
+ System.out.println(" matched-upsert-changes format:");
+ System.out.println(
+ " col=<source-table>.col | expression [, ...] (do not add
'<target-table>.' before 'col')");
+ System.out.println(
+ " * (upsert with all source cols; require target table's
schema is equal to source's)");
+ System.out.println(" not-matched-upsert-changes format:");
+ System.out.println(" col=expression (cannot use source table's
col)");
+ System.out.println(" insert-values format:");
+ System.out.println(
+ " col1,col2,...,col_end (must specify values of all
columns; can use <source-table>.col or expression)");
+ System.out.println(
+ " * (insert with all source cols; require target table's
schema is equal to source's)");
+ System.out.println(
+ " not-matched-condition: cannot use target table's columns to
construct condition expression.");
+ System.out.println(
+ " not-matched-by-source-condition: cannot use source table's
columns to construct condition expression.");
+ System.out.println(" alternative arguments:");
+ System.out.println(" --path <table-path> to represent the table
path.");
+ System.out.println(
+ " --using-source <source-expression> --as <source-alias> to
replace --using-table.");
+ System.out.println();
+
+ System.out.println("Note: ");
+ System.out.println(" 1. Target table must has primary keys.");
+ System.out.println(
+ " 2. All conditions, set changes and values should use Flink
SQL syntax. Please quote them with \" to escape special characters.");
+ System.out.println(" 3. source-alias cannot be duplicated with
existed table name.");
+ System.out.println(" 4. At least one merge action must be
specified.");
+ System.out.println(" 5. How to determine the changed rows with
different \"matched\":");
+ System.out.println(
+ " matched: changed rows are from target table and each can
match a source table row "
+ + "based on merge-condition and optional
matched-condition.");
+ System.out.println(
+ " not-matched: changed rows are from source table and all
rows cannot match any target table row "
+ + "based on merge-condition and optional
not-matched-condition.");
+ System.out.println(
+ " not-matched-by-source: changed rows are from target table
and all row cannot match any source table row "
+ + "based on merge-condition and optional
not-matched-by-source-condition.");
+ System.out.println(
+ " 6. If both matched-upsert and matched-delete actions are
present, their conditions must both be present too "
+ + "(same to not-matched-by-source-upsert and
not-matched-by-source-delete). Otherwise, all conditions are optional.");
+ System.out.println(" 7. source-alias cannot be duplicated with
existed table name.");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " merge-into --path hdfs:///path/to/T\n"
+ + " --using-table S\n"
+ + " --on \"T.k=S.k\"\n"
+ + " --merge-actions matched-upsert\n"
+ + " --matched-upsert-condition
\"T.v<>S.v\"\n"
+ + " --matched-upsert-set \"S.k,S.v\"");
+ System.out.println(
+ " It will find matched rows of target table that meet
condition (T.k=S.k AND T.v<>S.v) ind '-D' to test_table, then update T.v with
S.v.");
+ }
+
+ @Override
+ public void run() throws Exception {
+ // prepare source table
+ if (sourceTable != null) {
+ if (sourceTable.contains(".")) {
+ sourceTableIdentifier = Identifier.fromString(sourceTable);
+ } else {
+ sourceTableIdentifier =
+ Identifier.create(identifier.getDatabaseName(),
sourceTable);
+ }
+ } else {
+ tEnv.registerTable(sourceAlias, tEnv.sqlQuery(source));
+ sourceTableIdentifier =
Identifier.create(identifier.getDatabaseName(), sourceAlias);
+ }
+
+ List<DataStream<RowData>> dataStreams =
+ Stream.of(
+ getMatchedUpsertDataStream(),
+ getNotMatchedUpsertDataStream(),
+ getMatchedDeleteDataStream(),
+ getNotMatchedDeleteDataStream(),
+ getInsertDataStream())
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+
+ DataStream<RowData> firstDs = dataStreams.get(0);
+
sink(firstDs.union(dataStreams.stream().skip(1).toArray(DataStream[]::new)));
+ }
+
+ private Optional<DataStream<RowData>> getMatchedUpsertDataStream() {
+ if (!matchedUpsert) {
+ return Optional.empty();
+ }
+
+ List<String> project;
+ // extract project
+ if (matchedUpsertSet.equals("*")) {
+ project =
Collections.singletonList(sourceTableIdentifier.getObjectName() + "." + "*");
+ } else {
+ // validate upsert changes
+ // no need to check primary keys changes because merge condition
must contain all pks
+ // of the target table
+ Map<String, String> changes = parseKeyValues(matchedUpsertSet);
+ if (changes == null) {
+ throw new IllegalArgumentException(
+ "matched-upsert-set is invalid.\nRun <action> --help
for help.");
+ }
+ for (String targetField : changes.keySet()) {
+ if (!targetFieldNames.contains(targetField)) {
+ throw new RuntimeException(
+ String.format(
+ "Invalid column reference '%s' of table
'%s' at matched-upsert action.",
+ targetField, identifier.getFullName()));
+ }
+ }
+
+ // replace field names
+ // the table name is added before column name to avoid ambiguous
column reference
+ project =
+ targetFieldNames.stream()
+ .map(
+ name ->
+ changes.getOrDefault(
+ name,
identifier.getObjectName() + "." + name))
+ .collect(Collectors.toList());
+ }
+
+ // use inner join to find matched records
+ String query =
+ String.format(
+ "SELECT %s FROM %s INNER JOIN %s ON %s %s",
+ String.join(",", project),
+ identifier.getEscapedFullName(),
+ sourceTableIdentifier.getEscapedFullName(),
+ mergeCondition,
+ matchedUpsertCondition == null ? "" : "WHERE " +
matchedUpsertCondition);
+ LOG.info("Query used for matched-update:\n{}", query);
+
+ Table source = tEnv.sqlQuery(query);
+ checkSchema("matched-upsert", source);
+
+ return Optional.of(toDataStream(source, RowKind.UPDATE_AFTER,
converters));
+ }
+
+ private Optional<DataStream<RowData>> getNotMatchedUpsertDataStream() {
+ if (!notMatchedUpsert) {
+ return Optional.empty();
+ }
+
+ // validate upsert change
+ Map<String, String> changes =
parseKeyValues(notMatchedBySourceUpsertSet);
+ if (changes == null) {
+ throw new IllegalArgumentException(
+ "matched-upsert-set is invalid.\nRun <action> --help for
help.");
+ }
+ for (String targetField : changes.keySet()) {
+ if (!targetFieldNames.contains(targetField)) {
+ throw new RuntimeException(
+ String.format(
+ "Invalid column reference '%s' of table '%s'
at not-matched-by-source-upsert action.\nRun <action> --help for help.",
+ targetField, identifier.getFullName()));
+ }
+
+ if (primaryKeys.contains(targetField)) {
+ throw new RuntimeException(
+ "Not allowed to change primary key in
not-matched-by-source-upsert-set.\nRun <action> --help for help.");
+ }
+ }
+
+ // replace field names (won't be ambiguous here)
+ List<String> project =
+ targetFieldNames.stream()
+ .map(name -> changes.getOrDefault(name, name))
+ .collect(Collectors.toList());
+
+ // use not exists to find not matched records
+ String query =
+ String.format(
+ "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s
WHERE %s) %s",
+ String.join(",", project),
+ identifier.getEscapedFullName(),
+ sourceTableIdentifier.getEscapedFullName(),
+ mergeCondition,
+ notMatchedBySourceUpsertCondition == null
+ ? ""
+ : String.format("AND (%s)",
notMatchedBySourceUpsertCondition));
+
+ LOG.info("Query used for not-matched-by-source-upsert:\n{}", query);
+
+ Table source = tEnv.sqlQuery(query);
+ checkSchema("not-matched-by-source-upsert", source);
+
+ return Optional.of(toDataStream(source, RowKind.UPDATE_AFTER,
converters));
+ }
+
+ private Optional<DataStream<RowData>> getMatchedDeleteDataStream() {
+ if (!matchedDelete) {
+ return Optional.empty();
+ }
+
+ // the table name is added before column name to avoid ambiguous
column reference
+ List<String> project =
+ targetFieldNames.stream()
+ .map(name -> identifier.getObjectName() + "." + name)
+ .collect(Collectors.toList());
+
+ // use inner join to find matched records
+ String query =
+ String.format(
+ "SELECT %s FROM %s INNER JOIN %s ON %s %s",
+ String.join(",", project),
+ identifier.getEscapedFullName(),
+ sourceTableIdentifier.getEscapedFullName(),
+ mergeCondition,
+ matchedDeleteCondition == null ? "" : "WHERE " +
matchedDeleteCondition);
+ LOG.info("Query used by matched-delete:\n{}", query);
+
+ Table source = tEnv.sqlQuery(query);
+ checkSchema("matched-delete", source);
+
+ return Optional.of(toDataStream(source, RowKind.DELETE, converters));
+ }
+
+ private Optional<DataStream<RowData>> getNotMatchedDeleteDataStream() {
+ if (!notMatchedDelete) {
+ return Optional.empty();
+ }
+
+ // use not exists to find not matched records
+ String query =
+ String.format(
+ "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s
WHERE %s) %s",
+ String.join(",", targetFieldNames),
+ identifier.getEscapedFullName(),
+ sourceTableIdentifier.getEscapedFullName(),
+ mergeCondition,
+ notMatchedBySourceDeleteCondition == null
+ ? ""
+ : String.format("AND (%s)",
notMatchedBySourceDeleteCondition));
+ LOG.info("Query used by not-matched-by-source-delete:\n{}", query);
+
+ Table source = tEnv.sqlQuery(query);
+ checkSchema("not-matched-by-source-delete", source);
+
+ return Optional.of(toDataStream(source, RowKind.DELETE, converters));
+ }
+
+ private Optional<DataStream<RowData>> getInsertDataStream() {
+ if (!insert) {
+ return Optional.empty();
+ }
+
+ // use not exist to find rows to insert
+ String query =
+ String.format(
+ "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s
WHERE %s) %s",
+ notMatchedInsertValues,
+ sourceTableIdentifier.getEscapedFullName(),
+ identifier.getEscapedFullName(),
+ mergeCondition,
+ notMatchedInsertCondition == null
+ ? ""
+ : String.format("AND (%s)",
notMatchedInsertCondition));
+ LOG.info("Query used by not-matched-insert:\n{}", query);
+
+ Table source = tEnv.sqlQuery(query);
+ checkSchema("not-matched-insert", source);
+
+ return Optional.of(toDataStream(source, RowKind.INSERT, converters));
+ }
+
+ private void checkSchema(String action, Table source) {
+ List<DataType> actualTypes =
+
toTableStoreDataTypes(source.getResolvedSchema().getColumnDataTypes());
+ List<DataType> expectedTypes = this.table.rowType().getFieldTypes();
+ if (!compatibleCheck(actualTypes, expectedTypes)) {
+ throw new IllegalStateException(
+ String.format(
+ "The schema of result in action '%s' is invalid.\n"
+ + "Result schema: [%s]\n"
+ + "Expected schema: [%s]",
+ action,
+ actualTypes.stream()
+ .map(DataType::asSQLString)
+ .collect(Collectors.joining(", ")),
+ expectedTypes.stream()
+ .map(DataType::asSQLString)
+ .collect(Collectors.joining(", "))));
+ }
+ }
+
+ // pass converters to avoid "not serializable" exception
+ private DataStream<RowData> toDataStream(
+ Table source, RowKind kind, List<DataStructureConverter<Object,
Object>> converters) {
+ return tEnv.toChangelogStream(source)
+ .map(
+ row -> {
+ int arity = row.getArity();
+ GenericRowData rowData = new GenericRowData(kind,
arity);
+ for (int i = 0; i < arity; i++) {
+ rowData.setField(
+ i,
converters.get(i).toInternalOrNull(row.getField(i)));
+ }
+ return rowData;
+ });
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java
similarity index 99%
rename from
flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
rename to
flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java
index 8b941fdc..28da14c4 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionITCase.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/DropPartitionActionITCase.java
@@ -38,7 +38,7 @@ import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link DropPartitionAction}. */
-public class DropPartitionITCase extends ActionITCaseBase {
+public class DropPartitionActionITCase extends ActionITCaseBase {
private static final DataType[] FIELD_TYPES =
new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING(), DataTypes.INT()};
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
new file mode 100644
index 00000000..0cefe77f
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.action;
+
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.buildDdl;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.buildSimpleQuery;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.createTable;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.init;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.insertInto;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.sEnv;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.testBatchRead;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.testStreamingRead;
+import static
org.apache.flink.table.store.connector.util.ReadWriteTableTestUtil.validateStreamingReadResult;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+/** IT cases for {@link MergeIntoAction}. */
+public class MergeIntoActionITCase extends ActionITCaseBase {
+
+ private static final List<Row> initialRecords =
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+I", 2, "v_2", "creation", "02-27"),
+ changelogRow("+I", 3, "v_3", "creation", "02-27"),
+ changelogRow("+I", 4, "v_4", "creation", "02-27"),
+ changelogRow("+I", 5, "v_5", "creation", "02-28"),
+ changelogRow("+I", 6, "v_6", "creation", "02-28"),
+ changelogRow("+I", 7, "v_7", "creation", "02-28"),
+ changelogRow("+I", 8, "v_8", "creation", "02-28"),
+ changelogRow("+I", 9, "v_9", "creation", "02-28"),
+ changelogRow("+I", 10, "v_10", "creation", "02-28"));
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ init(warehouse);
+
+ // prepare table S
+ sEnv.executeSql(
+ buildDdl(
+ "S",
+ Arrays.asList("k INT", "v STRING", "dt STRING"),
+ Arrays.asList("k", "dt"),
+ Collections.singletonList("dt"),
+ new HashMap<>()));
+
+ insertInto(
+ "S",
+ "(1, 'v_1', '02-27')",
+ "(4, CAST (NULL AS STRING), '02-27')",
+ "(7, 'Seven', '02-28')",
+ "(8, CAST (NULL AS STRING), '02-28')",
+ "(8, 'v_8', '02-29')",
+ "(11, 'v_11', '02-29')",
+ "(12, 'v_12', '02-29')");
+ }
+
+ @ParameterizedTest(name = "changelog-producer = {0}")
+ @MethodSource("producerTestData")
+ public void testVariousChangelogProducer(
+ CoreOptions.ChangelogProducer producer, List<Row> expected) throws
Exception {
+ // prepare table T
+ prepareTable(producer);
+
+ // similar to:
+ // MERGE INTO T
+ // USING S
+ // ON T.k = S.k AND T.dt = S.dt
+ // WHEN MATCHED AND (T.v <> S.v AND S.v IS NOT NULL) THEN UPDATE
+ // SET v = S.v, last_action = 'matched_upsert'
+ // WHEN MATCHED AND S.v IS NULL THEN DELETE
+ // WHEN NOT MATCHED THEN INSERT VALUES (S.k, S.v, 'insert', S.dt)
+ // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
+ // SET v = v || '_nmu', last_action = 'not_matched_upsert'
+ // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
+ MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ action.withSourceTable("S")
+ .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+ .withMatchedUpsert(
+ "T.v <> S.v AND S.v IS NOT NULL", "v = S.v,
last_action = 'matched_upsert'")
+ .withMatchedDelete("S.v IS NULL")
+ .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
+ .withNotMatchedBySourceUpsert(
+ "dt < '02-28'", "v = v || '_nmu', last_action =
'not_matched_upsert'")
+ .withNotMatchedBySourceDelete("dt >= '02-28'");
+
+ validateActionRunResult(
+ action,
+ expected,
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
+ changelogRow("+I", 8, "v_8", "insert", "02-29"),
+ changelogRow("+I", 11, "v_11", "insert", "02-29"),
+ changelogRow("+I", 12, "v_12", "insert", "02-29")));
+ }
+
+ @Test
+ public void testUsingSource() throws Exception {
+ // prepare table T
+ prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+ // similar to:
+ // MERGE INTO T
+ // USING (SELECT * FROM S WHERE k < 12) AS SS
+ // ON T.k = SS.k AND T.dt = SS.dt
+ // WHEN MATCHED AND (T.v <> SS.v AND SS.v IS NOT NULL) THEN UPDATE
+ // SET v = SS.v, last_action = 'matched_upsert'
+ // WHEN MATCHED AND SS.v IS NULL THEN DELETE
+ // WHEN NOT MATCHED THEN INSERT VALUES (SS.k, SS.v, 'insert', SS.dt)
+ // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
+ // SET v = v || '_nmu', last_action = 'not_matched_upsert'
+ // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
+ MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ action.withSource("SELECT * FROM S WHERE k < 12", "SS")
+ .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
+ .withMatchedUpsert(
+ "T.v <> SS.v AND SS.v IS NOT NULL",
+ "v = SS.v, last_action = 'matched_upsert'")
+ .withMatchedDelete("SS.v IS NULL")
+ .withNotMatchedInsert(null, "SS.k, SS.v, 'insert', SS.dt")
+ .withNotMatchedBySourceUpsert(
+ "dt < '02-28'", "v = v || '_nmu', last_action =
'not_matched_upsert'")
+ .withNotMatchedBySourceDelete("dt >= '02-28'");
+
+ validateActionRunResult(
+ action,
+ Arrays.asList(
+ changelogRow("-U", 7, "v_7", "creation", "02-28"),
+ changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
+ changelogRow("-D", 4, "v_4", "creation", "02-27"),
+ changelogRow("-D", 8, "v_8", "creation", "02-28"),
+ changelogRow("+I", 8, "v_8", "insert", "02-29"),
+ changelogRow("+I", 11, "v_11", "insert", "02-29"),
+ changelogRow("-U", 2, "v_2", "creation", "02-27"),
+ changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("-U", 3, "v_3", "creation", "02-27"),
+ changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("-D", 5, "v_5", "creation", "02-28"),
+ changelogRow("-D", 6, "v_6", "creation", "02-28"),
+ changelogRow("-D", 9, "v_9", "creation", "02-28"),
+ changelogRow("-D", 10, "v_10", "creation", "02-28")),
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
+ changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
+ changelogRow("+I", 8, "v_8", "insert", "02-29"),
+ changelogRow("+I", 11, "v_11", "insert", "02-29")));
+ }
+
+ @Test
+ public void testMatchedUpsertSetAll() throws Exception {
+ // prepare table T
+ prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+ // build MergeIntoAction
+ MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS")
+ .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
+ .withMatchedUpsert(null, "*");
+
+ validateActionRunResult(
+ action,
+ Arrays.asList(
+ changelogRow("-U", 1, "v_1", "creation", "02-27"),
+ changelogRow("+U", 1, "v_1", "unknown", "02-27"),
+ changelogRow("-U", 4, "v_4", "creation", "02-27"),
+ changelogRow("+U", 4, null, "unknown", "02-27"),
+ changelogRow("-U", 7, "v_7", "creation", "02-28"),
+ changelogRow("+U", 7, "Seven", "unknown", "02-28"),
+ changelogRow("-U", 8, "v_8", "creation", "02-28"),
+ changelogRow("+U", 8, null, "unknown", "02-28")),
+ Arrays.asList(
+ changelogRow("+U", 1, "v_1", "unknown", "02-27"),
+ changelogRow("+I", 2, "v_2", "creation", "02-27"),
+ changelogRow("+I", 3, "v_3", "creation", "02-27"),
+ changelogRow("+U", 4, null, "unknown", "02-27"),
+ changelogRow("+I", 5, "v_5", "creation", "02-28"),
+ changelogRow("+I", 6, "v_6", "creation", "02-28"),
+ changelogRow("+U", 7, "Seven", "unknown", "02-28"),
+ changelogRow("+U", 8, null, "unknown", "02-28"),
+ changelogRow("+I", 9, "v_9", "creation", "02-28"),
+ changelogRow("+I", 10, "v_10", "creation", "02-28")));
+ }
+
+ @Test
+ public void testNotMatchedInsertAll() throws Exception {
+ // prepare table T
+ prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+ // build MergeIntoAction
+ MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS")
+ .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
+ .withNotMatchedInsert("SS.k < 12", "*");
+
+ validateActionRunResult(
+ action,
+ Arrays.asList(
+ changelogRow("+I", 8, "v_8", "unknown", "02-29"),
+ changelogRow("+I", 11, "v_11", "unknown", "02-29")),
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+I", 2, "v_2", "creation", "02-27"),
+ changelogRow("+I", 3, "v_3", "creation", "02-27"),
+ changelogRow("+I", 4, "v_4", "creation", "02-27"),
+ changelogRow("+I", 5, "v_5", "creation", "02-28"),
+ changelogRow("+I", 6, "v_6", "creation", "02-28"),
+ changelogRow("+I", 7, "v_7", "creation", "02-28"),
+ changelogRow("+I", 8, "v_8", "creation", "02-28"),
+ changelogRow("+I", 9, "v_9", "creation", "02-28"),
+ changelogRow("+I", 10, "v_10", "creation", "02-28"),
+ changelogRow("+I", 8, "v_8", "unknown", "02-29"),
+ changelogRow("+I", 11, "v_11", "unknown", "02-29")));
+ }
+
+ //
----------------------------------------------------------------------------------------------------------------
+ // Negative tests
+ //
----------------------------------------------------------------------------------------------------------------
+
+ @Test
+ public void testInsertChangesActionWithNonPkTable() {
+ String nonPkTable =
+ createTable(
+ Collections.singletonList("k int"),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ assertThatThrownBy(() -> new MergeIntoAction(warehouse, database,
nonPkTable))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage(
+ "merge-into action doesn't support table with no
primary keys defined.");
+ }
+
+ @Test
+ public void testIncompatibleSchema() throws Exception {
+ // prepare table T
+ prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+ // build MergeIntoAction
+ MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ action.withSourceTable("S")
+ .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+ .withNotMatchedInsert(null, "S.k, S.v, 0, S.dt");
+
+ assertThatThrownBy(action::run)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage(
+ "The schema of result in action 'not-matched-insert'
is invalid.\n"
+ + "Result schema: [INT NOT NULL, STRING, INT
NOT NULL, STRING NOT NULL]\n"
+ + "Expected schema: [INT NOT NULL, STRING,
STRING, STRING NOT NULL]");
+ }
+
+ private void validateActionRunResult(
+ MergeIntoAction action, List<Row> streamingExpected, List<Row>
batchExpected)
+ throws Exception {
+ BlockingIterator<Row, Row> iterator =
+ testStreamingRead(buildSimpleQuery("T"), initialRecords);
+ action.run();
+ // test streaming read
+ validateStreamingReadResult(iterator, streamingExpected);
+ iterator.close();
+ // test batch read
+ testBatchRead(buildSimpleQuery("T"), batchExpected);
+ }
+
+ private void prepareTable(CoreOptions.ChangelogProducer producer) throws
Exception {
+ sEnv.executeSql(
+ buildDdl(
+ "T",
+ Arrays.asList("k INT", "v STRING", "last_action
STRING", "dt STRING"),
+ Arrays.asList("k", "dt"),
+ Collections.singletonList("dt"),
+ new HashMap<String, String>() {
+ {
+ put(CHANGELOG_PRODUCER.key(),
producer.toString());
+ }
+ }));
+
+ insertInto(
+ "T",
+ "(1, 'v_1', 'creation', '02-27')",
+ "(2, 'v_2', 'creation', '02-27')",
+ "(3, 'v_3', 'creation', '02-27')",
+ "(4, 'v_4', 'creation', '02-27')",
+ "(5, 'v_5', 'creation', '02-28')",
+ "(6, 'v_6', 'creation', '02-28')",
+ "(7, 'v_7', 'creation', '02-28')",
+ "(8, 'v_8', 'creation', '02-28')",
+ "(9, 'v_9', 'creation', '02-28')",
+ "(10, 'v_10', 'creation', '02-28')");
+ }
+
+ private static List<Arguments> producerTestData() {
+ return Arrays.asList(
+ arguments(
+ CoreOptions.ChangelogProducer.NONE,
+ Arrays.asList(
+ changelogRow("-U", 7, "v_7", "creation",
"02-28"),
+ changelogRow("+U", 7, "Seven",
"matched_upsert", "02-28"),
+ changelogRow("-D", 4, "v_4", "creation",
"02-27"),
+ changelogRow("-D", 8, "v_8", "creation",
"02-28"),
+ changelogRow("+I", 8, "v_8", "insert",
"02-29"),
+ changelogRow("+I", 11, "v_11", "insert",
"02-29"),
+ changelogRow("+I", 12, "v_12", "insert",
"02-29"),
+ changelogRow("-U", 2, "v_2", "creation",
"02-27"),
+ changelogRow("+U", 2, "v_2_nmu",
"not_matched_upsert", "02-27"),
+ changelogRow("-U", 3, "v_3", "creation",
"02-27"),
+ changelogRow("+U", 3, "v_3_nmu",
"not_matched_upsert", "02-27"),
+ changelogRow("-D", 5, "v_5", "creation",
"02-28"),
+ changelogRow("-D", 6, "v_6", "creation",
"02-28"),
+ changelogRow("-D", 9, "v_9", "creation",
"02-28"),
+ changelogRow("-D", 10, "v_10", "creation",
"02-28"))),
+ arguments(
+ CoreOptions.ChangelogProducer.INPUT,
+ Arrays.asList(
+ changelogRow("+U", 7, "Seven",
"matched_upsert", "02-28"),
+ changelogRow("-D", 4, "v_4", "creation",
"02-27"),
+ changelogRow("-D", 8, "v_8", "creation",
"02-28"),
+ changelogRow("+I", 8, "v_8", "insert",
"02-29"),
+ changelogRow("+I", 11, "v_11", "insert",
"02-29"),
+ changelogRow("+I", 12, "v_12", "insert",
"02-29"),
+ changelogRow("+U", 2, "v_2_nmu",
"not_matched_upsert", "02-27"),
+ changelogRow("+U", 3, "v_3_nmu",
"not_matched_upsert", "02-27"),
+ changelogRow("-D", 5, "v_5", "creation",
"02-28"),
+ changelogRow("-D", 6, "v_6", "creation",
"02-28"),
+ changelogRow("-D", 9, "v_9", "creation",
"02-28"),
+ changelogRow("-D", 10, "v_10", "creation",
"02-28"))),
+ arguments(
+ CoreOptions.ChangelogProducer.FULL_COMPACTION,
+ Arrays.asList(
+ changelogRow("-U", 7, "v_7", "creation",
"02-28"),
+ changelogRow("+U", 7, "Seven",
"matched_upsert", "02-28"),
+ changelogRow("-D", 4, "v_4", "creation",
"02-27"),
+ changelogRow("-D", 8, "v_8", "creation",
"02-28"),
+ changelogRow("+I", 8, "v_8", "insert",
"02-29"),
+ changelogRow("+I", 11, "v_11", "insert",
"02-29"),
+ changelogRow("+I", 12, "v_12", "insert",
"02-29"),
+ changelogRow("-U", 2, "v_2", "creation",
"02-27"),
+ changelogRow("+U", 2, "v_2_nmu",
"not_matched_upsert", "02-27"),
+ changelogRow("-U", 3, "v_3", "creation",
"02-27"),
+ changelogRow("+U", 3, "v_3_nmu",
"not_matched_upsert", "02-27"),
+ changelogRow("-D", 5, "v_5", "creation",
"02-28"),
+ changelogRow("-D", 6, "v_6", "creation",
"02-28"),
+ changelogRow("-D", 9, "v_9", "creation",
"02-28"),
+ changelogRow("-D", 10, "v_10", "creation",
"02-28"))));
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java
index 92d72b6e..69841a59 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/util/ReadWriteTableTestUtil.java
@@ -317,7 +317,7 @@ public class ReadWriteTableTestUtil {
assertThat(expectedRecords).isEmpty();
}
- private static String buildDdl(
+ public static String buildDdl(
String table,
List<String> fieldsSpec,
List<String> primaryKeys,