This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 3fe06f73 [FLINK-31415] Support target alias and using ddls to create
source
3fe06f73 is described below
commit 3fe06f7384bb8ce5fab5997b5d0e81761874f092
Author: yuzelin <[email protected]>
AuthorDate: Tue Mar 14 09:30:41 2023 +0800
[FLINK-31415] Support target alias and using ddls to create source
This closes #596
---
docs/content/docs/how-to/writing-tables.md | 74 ++++++++++-
.../table/store/tests/FlinkActionsE2eTest.java | 4 +-
.../table/store/connector/action/ActionBase.java | 5 +-
.../store/connector/action/MergeIntoAction.java | 146 +++++++++++++--------
.../connector/action/MergeIntoActionITCase.java | 111 +++++++++-------
5 files changed, 235 insertions(+), 105 deletions(-)
diff --git a/docs/content/docs/how-to/writing-tables.md
b/docs/content/docs/how-to/writing-tables.md
index 2d3d0deb..2c3d693d 100644
--- a/docs/content/docs/how-to/writing-tables.md
+++ b/docs/content/docs/how-to/writing-tables.md
@@ -314,7 +314,9 @@ Run the following command to submit a 'merge-into' job for
the table.
--warehouse <warehouse-path> \
--database <database-name> \
--table <target-table> \
- --using-table <source-table> \
+ [--target-as <target-table-alias>] \
+ --source-table <source-table> \
+ [--source-as <source-table-alias>] \
--on <merge-condition> \
--merge-actions
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>
\
--matched-upsert-condition <matched-condition> \
@@ -326,6 +328,8 @@ Run the following command to submit a 'merge-into' job for
the table.
--not-matched-by-source-upsert-set <not-matched-upsert-changes> \
--not-matched-by-source-delete-condition <not-matched-by-source-condition>
+Alternatively, you can use '--source-sql <sql> [, --source-sql <sql> ...]' to
create a new table as source table at runtime.
+
-- Examples:
-- Find all orders mentioned in the source table, then mark as important if
the price is above 100
-- or delete if the price is under 10.
@@ -336,12 +340,12 @@ Run the following command to submit a 'merge-into' job
for the table.
--warehouse <warehouse-path> \
--database <database-name> \
--table T \
- --using-table S \
+ --source-table S \
--on "T.id = S.order_id" \
--merge-actions \
matched-upsert,matched-delete \
--matched-upsert-condition "T.price > 100" \
- --matched-upsert-set "id = T.id, price = T.price, mark = 'important'" \
+ --matched-upsert-set "mark = 'important'" \
--matched-delete-condition "T.price < 10"
-- For matched order rows, increase the price, and if there is no match,
insert the order from the
@@ -353,11 +357,11 @@ Run the following command to submit a 'merge-into' job
for the table.
--warehouse <warehouse-path> \
--database <database-name> \
--table T \
- --using-table S \
+ --source-table S \
--on "T.id = S.order_id" \
--merge-actions \
matched-upsert,not-matched-insert \
- --matched-upsert-set "id = T.id, price = T.price + 20, mark = T.mark" \
+ --matched-upsert-set "price = T.price + 20" \
--not-matched-insert-values *
-- For not matched by source order rows (which are in the target table and
does not match any row in the
@@ -369,15 +373,71 @@ Run the following command to submit a 'merge-into' job
for the table.
--warehouse <warehouse-path> \
--database <database-name> \
--table T \
- --using-table S \
+ --source-table S \
--on "T.id = S.order_id" \
--merge-actions \
not-matched-by-source-upsert,not-matched-by-source-delete \
--not-matched-by-source-upsert-condition "T.mark <> 'trivial'" \
- --not-matched-by-source-upsert-set "id = T.id, price = T.price - 20, mark
= T.mark" \
+ --not-matched-by-source-upsert-set "price = T.price - 20" \
--not-matched-by-source-delete-condition "T.mark = 'trivial'"
+
+-- An source-sql example:
+-- Create a temporary view S in new catalog and use it as source table
+./flink run \
+ -c org.apache.flink.table.store.connector.action.FlinkActions \
+ /path/to/flink-table-store-flink-**-{{< version >}}.jar \
+ merge-into \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table T \
+ --source-sql "CREATE CATALOG test WITH (...)" \
+ --source-sql "USE CATALOG test" \
+ --source-sql "USE DATABASE default" \
+ --source-sql "CREATE TEMPORARY VIEW S AS SELECT order_id, price,
'important' FROM important_order" \
+ --source-as test.default.S \
+ --on "T.id = S.order_id" \
+ --merge-actions not-matched-insert\
+ --not-matched-insert-values *
```
+The term 'matched' explanation:
+1. matched: changed rows are from target table and each can match a source
table row based on
+merge-condition and optional matched-condition (source ∩ target).
+2. not-matched: changed rows are from source table and all rows cannot match
any target table
+row based on merge-condition and optional not-matched-condition (source -
target).
+3. not-matched-by-source: changed rows are from target table and all row
cannot match any source
+table row based on merge-condition and optional
not-matched-by-source-condition (target - source).
+
+Parameters format:\
+All conditions, set changes and values should use Flink SQL syntax. Please
quote them
+with \" to escape special characters.
+1. matched-upsert-changes:\
+col = <source-table>.col | expression [, ...] (Means set target.col with given
value. Do not
+add '<target-table>.' before 'col'.)\
+Especially, you can use '*' to set columns with all source columns (require
target table's
+schema is equal to source's).
+2. not-matched-upsert-changes is similar to matched-upsert-changes, but you
cannot reference
+source table's column or use '*'.
+3. insert-values:\
+col1,col2,...,col_end\
+Must specify values of all columns. For each column, you can reference
<source-table>.col or
+use an expression.\
+Especially, you can use '*' to insert with all source columns (require target
table's schema
+is equal to source's).
+4. not-matched-condition cannot use target table's columns to construct
condition expression.
+5. not-matched-by-source-condition cannot use source table's columns to
construct condition expression.
+
+{{< hint warning >}}
+1. source-alias cannot be duplicated with existed table name. If you use
--source-ddl, source-alias
+must be specified and equal to the table name in "CREATE" statement.
+2. If the source table is not in the same place as target table, the
source-table-name or the source-alias
+should be qualified (database.table or catalog.database.table if in different
catalog).
+3. At least one merge action must be specified.
+4. If both matched-upsert and matched-delete actions are present, their
conditions must both be present too
+(same to not-matched-by-source-upsert and not-matched-by-source-delete).
Otherwise, all conditions are optional.
+
+{{< /hint >}}
+
For more information of 'merge-into', see
```bash
diff --git
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
index 7b1e2248..3dfb7c7d 100644
---
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
+++
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/FlinkActionsE2eTest.java
@@ -222,6 +222,8 @@ public class FlinkActionsE2eTest extends E2eTestBase {
jobManager.execInContainer(
"bin/flink",
"run",
+ "-p",
+ "1",
"-c",
"org.apache.flink.table.store.connector.action.FlinkActions",
"--detached",
@@ -301,7 +303,7 @@ public class FlinkActionsE2eTest extends E2eTestBase {
"default",
"--table",
"T",
- "--using-table",
+ "--source-table",
"S",
"--on",
"T.k=S.k",
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
index d0750c09..0384e8a6 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java
@@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.stream.Collectors;
import static
org.apache.flink.table.store.file.catalog.Catalog.DEFAULT_DATABASE;
@@ -59,6 +60,8 @@ public abstract class ActionBase implements Action {
protected final FlinkCatalog flinkCatalog;
+ protected final String catalogName = "table-store-" + UUID.randomUUID();
+
protected StreamExecutionEnvironment env;
protected StreamTableEnvironment tEnv;
@@ -77,7 +80,7 @@ public abstract class ActionBase implements Action {
CatalogFactory.createCatalog(
CatalogContext.create(
new Options().set(CatalogOptions.WAREHOUSE,
warehouse)));
- flinkCatalog = new FlinkCatalog(catalog, "table-store",
DEFAULT_DATABASE);
+ flinkCatalog = new FlinkCatalog(catalog, catalogName,
DEFAULT_DATABASE);
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
index 93de9d26..9943bfdb 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/MergeIntoAction.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.store.connector.LogicalTypeConversion;
-import org.apache.flink.table.store.file.catalog.Identifier;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.types.DataField;
import org.apache.flink.table.store.types.DataType;
@@ -40,6 +39,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -87,10 +87,12 @@ public class MergeIntoAction extends ActionBase {
// field names of target table
private final List<String> targetFieldNames;
- // source
+ // target table
+ @Nullable private String targetAlias;
+
+ // source table
@Nullable private String sourceTable;
- private Identifier sourceTableIdentifier;
- @Nullable private String source;
+ @Nullable private String[] sourceSqls;
@Nullable private String sourceAlias;
// merge condition
@@ -150,14 +152,20 @@ public class MergeIntoAction extends ActionBase {
.collect(Collectors.toList());
}
- public MergeIntoAction withSourceTable(String sourceTable) {
+ public MergeIntoAction withTargetAlias(String targetAlias) {
+ this.targetAlias = targetAlias;
+ return this;
+ }
+
+ public MergeIntoAction withSourceTable(@Nullable String sourceAlias,
String sourceTable) {
+ this.sourceAlias = sourceAlias;
this.sourceTable = sourceTable;
return this;
}
- public MergeIntoAction withSource(String source, String sourceAlias) {
- this.source = source;
+ public MergeIntoAction withSourceSqls(String sourceAlias, String...
sourceSqls) {
this.sourceAlias = sourceAlias;
+ this.sourceSqls = sourceSqls;
return this;
}
@@ -221,6 +229,10 @@ public class MergeIntoAction extends ActionBase {
MergeIntoAction action = new MergeIntoAction(tablePath.f0,
tablePath.f1, tablePath.f2);
+ if (params.has("target-as")) {
+ action.withTargetAlias(params.get("target-as"));
+ }
+
if (!initSource(params, action)) {
return Optional.empty();
}
@@ -273,30 +285,30 @@ public class MergeIntoAction extends ActionBase {
}
private static boolean initSource(MultipleParameterTool params,
MergeIntoAction action) {
- String sourceTable = params.get("using-table");
- String source = params.get("using-source");
- String sourceAlias = params.get("as");
+ String sourceTable = params.get("source-table");
+ Collection<String> sourceSqls = params.getMultiParameter("source-sql");
+ String sourceAlias = params.get("source-as");
int count = 0;
if (sourceTable != null) {
- action.withSourceTable(params.get("using-table"));
+ action.withSourceTable(sourceAlias, sourceTable);
count++;
}
- if (source != null) {
+ if (sourceSqls != null) {
if (sourceAlias == null) {
System.err.println(
- "The source and its alias must be specified
together.\n"
+ "--source-sql and --source-as must be specified
together.\n"
+ "Run <action> --help for help.");
return false;
}
- action.withSource(source, sourceAlias);
+ action.withSourceSqls(sourceAlias, sourceSqls.toArray(new
String[0]));
count++;
}
if (count != 1) {
System.err.println(
- "Please specify either \"source table\" or \"source,
source's alias\".\n"
+ "Please specify source as either \"source-table\" or
\"source-sql\".\n"
+ "Run <action> --help for help.");
return false;
}
@@ -359,8 +371,10 @@ public class MergeIntoAction extends ActionBase {
System.out.println(
" merge-into --warehouse <warehouse-path>\n"
+ " --database <database-name>\n"
- + " --table <table-name>\n"
- + " --using-table <source-table>\n"
+ + " --table <target-table-name>\n"
+ + " [--target-as <target-table-alias>]\n"
+ + " --source-table <source-table>\n"
+ + " [--source-as <source-table-alias>]\n"
+ " --on <merge-condition>\n"
+ " --merge-actions
<matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete>\n"
+ " --matched-upsert-condition
<matched-condition>\n"
@@ -371,35 +385,45 @@ public class MergeIntoAction extends ActionBase {
+ "
--not-matched-by-source-upsert-condition <not-matched-by-source-condition>\n"
+ " --not-matched-by-source-upsert-set
<not-matched-upsert-changes>\n"
+ "
--not-matched-by-source-delete-condition <not-matched-by-source-condition>");
+
System.out.println(" matched-upsert-changes format:");
System.out.println(
" col=<source-table>.col | expression [, ...] (do not add
'<target-table>.' before 'col')");
System.out.println(
" * (upsert with all source cols; require target table's
schema is equal to source's)");
+
System.out.println(" not-matched-upsert-changes format:");
System.out.println(" col=expression (cannot use source table's
col)");
+
System.out.println(" insert-values format:");
System.out.println(
" col1,col2,...,col_end (must specify values of all
columns; can use <source-table>.col or expression)");
System.out.println(
" * (insert with all source cols; require target table's
schema is equal to source's)");
+
System.out.println(
" not-matched-condition: cannot use target table's columns to
construct condition expression.");
System.out.println(
" not-matched-by-source-condition: cannot use source table's
columns to construct condition expression.");
+
System.out.println(" alternative arguments:");
System.out.println(" --path <table-path> to represent the table
path.");
System.out.println(
- " --using-source <source-expression> --as <source-alias> to
replace --using-table.");
+ " --source-sql <sql>[, --source-sql <sql> ...] can create a
new table as source table at runtime.");
System.out.println();
System.out.println("Note: ");
System.out.println(" 1. Target table must has primary keys.");
System.out.println(
" 2. All conditions, set changes and values should use Flink
SQL syntax. Please quote them with \" to escape special characters.");
- System.out.println(" 3. source-alias cannot be duplicated with
existed table name.");
- System.out.println(" 4. At least one merge action must be
specified.");
- System.out.println(" 5. How to determine the changed rows with
different \"matched\":");
+ System.out.println(
+ " 3. source-alias cannot be duplicated with existed table
name. "
+ + "If you use --source-sql, source-alias must be
specified and equal to the table name in \"CREATE\" statement.");
+ System.out.println(
+ " 4. If the source table is not in the same place as target
table, "
+ + "the source-table-name or the source-alias should be
qualified (database.table or catalog.database.table if in different catalog).");
+ System.out.println(" 5. At least one merge action must be
specified.");
+ System.out.println(" 6. How to determine the changed rows with
different \"matched\":");
System.out.println(
" matched: changed rows are from target table and each can
match a source table row "
+ "based on merge-condition and optional
matched-condition.");
@@ -410,36 +434,42 @@ public class MergeIntoAction extends ActionBase {
" not-matched-by-source: changed rows are from target table
and all row cannot match any source table row "
+ "based on merge-condition and optional
not-matched-by-source-condition.");
System.out.println(
- " 6. If both matched-upsert and matched-delete actions are
present, their conditions must both be present too "
+ " 7. If both matched-upsert and matched-delete actions are
present, their conditions must both be present too "
+ "(same to not-matched-by-source-upsert and
not-matched-by-source-delete). Otherwise, all conditions are optional.");
- System.out.println(" 7. source-alias cannot be duplicated with
existed table name.");
System.out.println();
System.out.println("Examples:");
System.out.println(
" merge-into --path hdfs:///path/to/T\n"
- + " --using-table S\n"
- + " --on \"T.k=S.k\"\n"
+ + " --source-table S\n"
+ + " --on \"T.k = S.k\"\n"
+ " --merge-actions matched-upsert\n"
- + " --matched-upsert-condition
\"T.v<>S.v\"\n"
- + " --matched-upsert-set \"S.k,S.v\"");
+ + " --matched-upsert-condition \"T.v <>
S.v\"\n"
+ + " --matched-upsert-set \"v = S.v\"");
System.out.println(
- " It will find matched rows of target table that meet
condition (T.k=S.k AND T.v<>S.v) ind '-D' to test_table, then update T.v with
S.v.");
+ " It will find matched rows of target table that meet
condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
}
@Override
public void run() throws Exception {
+ // handle target alias
+ if (targetAlias != null) {
+ tEnv.registerTable(targetAlias,
tEnv.from(identifier.getObjectName()));
+ }
+
// prepare source table
if (sourceTable != null) {
- if (sourceTable.contains(".")) {
- sourceTableIdentifier = Identifier.fromString(sourceTable);
- } else {
- sourceTableIdentifier =
- Identifier.create(identifier.getDatabaseName(),
sourceTable);
- }
+ sourceAlias = sourceTable;
} else {
- tEnv.registerTable(sourceAlias, tEnv.sqlQuery(source));
- sourceTableIdentifier =
Identifier.create(identifier.getDatabaseName(), sourceAlias);
+ // NOTE: sql may change current catalog and database
+ try {
+ for (String sql : sourceSqls) {
+ tEnv.executeSql(sql).await();
+ }
+ } catch (Throwable t) {
+ LOG.error("Error occurs when executing sql in --source-sql.",
t);
+ throw new RuntimeException("Error occurs when executing sql in
--source-sql.", t);
+ }
}
List<DataStream<RowData>> dataStreams =
@@ -465,7 +495,7 @@ public class MergeIntoAction extends ActionBase {
List<String> project;
// extract project
if (matchedUpsertSet.equals("*")) {
- project =
Collections.singletonList(sourceTableIdentifier.getObjectName() + "." + "*");
+ project = Collections.singletonList(sourceAlias + "." + "*");
} else {
// validate upsert changes
// no need to check primary keys changes because merge condition
must contain all pks
@@ -488,10 +518,7 @@ public class MergeIntoAction extends ActionBase {
// the table name is added before column name to avoid ambiguous
column reference
project =
targetFieldNames.stream()
- .map(
- name ->
- changes.getOrDefault(
- name,
identifier.getObjectName() + "." + name))
+ .map(name -> changes.getOrDefault(name,
targetTableName() + "." + name))
.collect(Collectors.toList());
}
@@ -500,8 +527,8 @@ public class MergeIntoAction extends ActionBase {
String.format(
"SELECT %s FROM %s INNER JOIN %s ON %s %s",
String.join(",", project),
- identifier.getEscapedFullName(),
- sourceTableIdentifier.getEscapedFullName(),
+ escapedTargetName(),
+ escapedSourceName(),
mergeCondition,
matchedUpsertCondition == null ? "" : "WHERE " +
matchedUpsertCondition);
LOG.info("Query used for matched-update:\n{}", query);
@@ -548,8 +575,8 @@ public class MergeIntoAction extends ActionBase {
String.format(
"SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s
WHERE %s) %s",
String.join(",", project),
- identifier.getEscapedFullName(),
- sourceTableIdentifier.getEscapedFullName(),
+ escapedTargetName(),
+ escapedSourceName(),
mergeCondition,
notMatchedBySourceUpsertCondition == null
? ""
@@ -571,7 +598,7 @@ public class MergeIntoAction extends ActionBase {
// the table name is added before column name to avoid ambiguous
column reference
List<String> project =
targetFieldNames.stream()
- .map(name -> identifier.getObjectName() + "." + name)
+ .map(name -> targetTableName() + "." + name)
.collect(Collectors.toList());
// use inner join to find matched records
@@ -579,8 +606,8 @@ public class MergeIntoAction extends ActionBase {
String.format(
"SELECT %s FROM %s INNER JOIN %s ON %s %s",
String.join(",", project),
- identifier.getEscapedFullName(),
- sourceTableIdentifier.getEscapedFullName(),
+ escapedTargetName(),
+ escapedSourceName(),
mergeCondition,
matchedDeleteCondition == null ? "" : "WHERE " +
matchedDeleteCondition);
LOG.info("Query used by matched-delete:\n{}", query);
@@ -601,8 +628,8 @@ public class MergeIntoAction extends ActionBase {
String.format(
"SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s
WHERE %s) %s",
String.join(",", targetFieldNames),
- identifier.getEscapedFullName(),
- sourceTableIdentifier.getEscapedFullName(),
+ escapedTargetName(),
+ escapedSourceName(),
mergeCondition,
notMatchedBySourceDeleteCondition == null
? ""
@@ -625,8 +652,8 @@ public class MergeIntoAction extends ActionBase {
String.format(
"SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s
WHERE %s) %s",
notMatchedInsertValues,
- sourceTableIdentifier.getEscapedFullName(),
- identifier.getEscapedFullName(),
+ escapedSourceName(),
+ escapedTargetName(),
mergeCondition,
notMatchedInsertCondition == null
? ""
@@ -674,4 +701,19 @@ public class MergeIntoAction extends ActionBase {
return rowData;
});
}
+
+ private String targetTableName() {
+ return targetAlias == null ? identifier.getObjectName() : targetAlias;
+ }
+
+ private String escapedTargetName() {
+ return String.format(
+ "`%s`.`%s`.`%s`", catalogName, identifier.getDatabaseName(),
targetTableName());
+ }
+
+ private String escapedSourceName() {
+ return Arrays.stream(sourceAlias.split("\\."))
+ .map(s -> String.format("`%s`", s))
+ .collect(Collectors.joining("."));
+ }
}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
index 0cefe77f..27ef4dca 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/action/MergeIntoActionITCase.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.connector.action;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
@@ -106,7 +107,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
// SET v = v || '_nmu', last_action = 'not_matched_upsert'
// WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
- action.withSourceTable("S")
+ action.withSourceTable(null, "S")
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withMatchedUpsert(
"T.v <> S.v AND S.v IS NOT NULL", "v = S.v,
last_action = 'matched_upsert'")
@@ -130,57 +131,79 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
}
@Test
- public void testUsingSource() throws Exception {
+ public void testTargetAlias() throws Exception {
// prepare table T
prepareTable(CoreOptions.ChangelogProducer.NONE);
- // similar to:
- // MERGE INTO T
- // USING (SELECT * FROM S WHERE k < 12) AS SS
- // ON T.k = SS.k AND T.dt = SS.dt
- // WHEN MATCHED AND (T.v <> SS.v AND SS.v IS NOT NULL) THEN UPDATE
- // SET v = SS.v, last_action = 'matched_upsert'
- // WHEN MATCHED AND SS.v IS NULL THEN DELETE
- // WHEN NOT MATCHED THEN INSERT VALUES (SS.k, SS.v, 'insert', SS.dt)
- // WHEN NOT MATCHED BY SOURCE AND (dt < '02-28') THEN UPDATE
- // SET v = v || '_nmu', last_action = 'not_matched_upsert'
- // WHEN NOT MATCHED BY SOURCE AND (dt >= '02-28') THEN DELETE
MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
- action.withSource("SELECT * FROM S WHERE k < 12", "SS")
- .withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
- .withMatchedUpsert(
- "T.v <> SS.v AND SS.v IS NOT NULL",
- "v = SS.v, last_action = 'matched_upsert'")
- .withMatchedDelete("SS.v IS NULL")
- .withNotMatchedInsert(null, "SS.k, SS.v, 'insert', SS.dt")
- .withNotMatchedBySourceUpsert(
- "dt < '02-28'", "v = v || '_nmu', last_action =
'not_matched_upsert'")
- .withNotMatchedBySourceDelete("dt >= '02-28'");
+ action.withTargetAlias("TT")
+ .withSourceTable(null, "S")
+ .withMergeCondition("TT.k = S.k AND TT.dt = S.dt")
+ .withMatchedDelete("S.v IS NULL");
validateActionRunResult(
action,
Arrays.asList(
- changelogRow("-U", 7, "v_7", "creation", "02-28"),
- changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
changelogRow("-D", 4, "v_4", "creation", "02-27"),
- changelogRow("-D", 8, "v_8", "creation", "02-28"),
- changelogRow("+I", 8, "v_8", "insert", "02-29"),
- changelogRow("+I", 11, "v_11", "insert", "02-29"),
- changelogRow("-U", 2, "v_2", "creation", "02-27"),
- changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("-U", 3, "v_3", "creation", "02-27"),
- changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("-D", 5, "v_5", "creation", "02-28"),
- changelogRow("-D", 6, "v_6", "creation", "02-28"),
- changelogRow("-D", 9, "v_9", "creation", "02-28"),
- changelogRow("-D", 10, "v_10", "creation", "02-28")),
+ changelogRow("-D", 8, "v_8", "creation", "02-28")),
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
- changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
- changelogRow("+I", 8, "v_8", "insert", "02-29"),
- changelogRow("+I", 11, "v_11", "insert", "02-29")));
+ changelogRow("+I", 2, "v_2", "creation", "02-27"),
+ changelogRow("+I", 3, "v_3", "creation", "02-27"),
+ changelogRow("+I", 5, "v_5", "creation", "02-28"),
+ changelogRow("+I", 6, "v_6", "creation", "02-28"),
+ changelogRow("+I", 7, "v_7", "creation", "02-28"),
+ changelogRow("+I", 9, "v_9", "creation", "02-28"),
+ changelogRow("+I", 10, "v_10", "creation", "02-28")));
+ }
+
+ @Test
+ public void testUsingDdlSource() throws Exception {
+ // prepare table T
+ prepareTable(CoreOptions.ChangelogProducer.NONE);
+
+ TestValuesTableFactory.registerData(
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "02-27"),
+ changelogRow("+I", 4, null, "02-27"),
+ changelogRow("+I", 8, null, "02-28")));
+
+ String catalog =
+ String.format(
+ "CREATE CATALOG test_cat WITH ('type' = 'table-store',
'warehouse' = '%s')",
+ getTempDirPath());
+ String useCatalog = "USE CATALOG test_cat";
+ String id =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "02-27"),
+ changelogRow("+I", 4, null, "02-27"),
+ changelogRow("+I", 8, null, "02-28")));
+ String ddl =
+ String.format(
+ "CREATE TEMPORARY TABLE S (k INT, v STRING, dt
STRING)\n"
+ + "WITH ('connector' = 'values', 'bounded' =
'true', 'data-id' = '%s');",
+ id);
+
+ MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
+ // test current catalog and current database
+ action.withSourceSqls("S", catalog, useCatalog, ddl)
+ .withMergeCondition("T.k = S.k AND T.dt = S.dt")
+ .withMatchedDelete("S.v IS NULL");
+ validateActionRunResult(
+ action,
+ Arrays.asList(
+ changelogRow("-D", 4, "v_4", "creation", "02-27"),
+ changelogRow("-D", 8, "v_8", "creation", "02-28")),
+ Arrays.asList(
+ changelogRow("+I", 1, "v_1", "creation", "02-27"),
+ changelogRow("+I", 2, "v_2", "creation", "02-27"),
+ changelogRow("+I", 3, "v_3", "creation", "02-27"),
+ changelogRow("+I", 5, "v_5", "creation", "02-28"),
+ changelogRow("+I", 6, "v_6", "creation", "02-28"),
+ changelogRow("+I", 7, "v_7", "creation", "02-28"),
+ changelogRow("+I", 9, "v_9", "creation", "02-28"),
+ changelogRow("+I", 10, "v_10", "creation", "02-28")));
}
@Test
@@ -190,7 +213,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
// build MergeIntoAction
MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
- action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS")
+ action.withSourceSqls("SS", "CREATE TEMPORARY VIEW SS AS SELECT k, v,
'unknown', dt FROM S")
.withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
.withMatchedUpsert(null, "*");
@@ -225,7 +248,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
// build MergeIntoAction
MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
- action.withSource("SELECT k, v, 'unknown', dt FROM S", "SS")
+ action.withSourceSqls("SS", "CREATE TEMPORARY VIEW SS AS SELECT k, v,
'unknown', dt FROM S")
.withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
.withNotMatchedInsert("SS.k < 12", "*");
@@ -274,7 +297,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
// build MergeIntoAction
MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
- action.withSourceTable("S")
+ action.withSourceTable(null, "S")
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withNotMatchedInsert(null, "S.k, S.v, 0, S.dt");