This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a5d1b3c77 [flink] Add merge-engine check when executing row level
batch update and delete (#3181)
a5d1b3c77 is described below
commit a5d1b3c77c2683f23bbc5c7e755dfbc7463a369a
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 15 10:03:02 2024 +0800
[flink] Add merge-engine check when executing row level batch update and
delete (#3181)
---
docs/content/primary-key-table/merge-engine.md | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 40 ++++++++--
.../apache/paimon/flink/action/DeleteAction.java | 14 ++++
.../paimon/flink/action/MergeIntoAction.java | 54 +++++++++++--
.../flink/action/MergeIntoActionFactory.java | 34 +-------
.../paimon/flink/procedure/MergeIntoProcedure.java | 3 +-
.../SupportsRowLevelOperationFlinkTableSink.java | 90 +++++++++++-----------
.../apache/paimon/flink/ReadWriteTableITCase.java | 17 ++--
.../paimon/flink/action/MergeIntoActionITCase.java | 53 -------------
9 files changed, 158 insertions(+), 153 deletions(-)
diff --git a/docs/content/primary-key-table/merge-engine.md
b/docs/content/primary-key-table/merge-engine.md
index bc6c1ee35..7ae7035e4 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -34,6 +34,12 @@ result in strange behavior. When the input is out of order,
we recommend that yo
[Sequence Field]({{< ref "primary-key-table/sequence-rowkind#sequence-field"
>}}) to correct disorder.
{{< /hint >}}
+{{< hint info >}}
+Some compute engines support row level update and delete in batch mode but not
all merge engines support them.
+- Support batch update merge engines: `deduplicate` and `first-row`.
+- Support batch delete merge engines: `deduplicate`.
+{{< /hint >}}
+
## Deduplicate
`deduplicate` merge engine is the default merge engine. Paimon will only keep
the latest record and throw away other records with the same primary keys.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 54ab8b6f3..0d78d2ec8 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1705,20 +1705,28 @@ public class CoreOptions implements Serializable {
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
- DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
+ DEDUPLICATE("deduplicate", "De-duplicate and keep the last row.",
true, true),
- PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
+ PARTIAL_UPDATE("partial-update", "Partial update non-null fields.",
false, false),
- AGGREGATE("aggregation", "Aggregate fields with same primary key."),
+ AGGREGATE("aggregation", "Aggregate fields with same primary key.",
false, false),
- FIRST_ROW("first-row", "De-duplicate and keep the first row.");
+ FIRST_ROW("first-row", "De-duplicate and keep the first row.", true,
false);
private final String value;
private final String description;
-
- MergeEngine(String value, String description) {
+ private final boolean supportBatchUpdate;
+ private final boolean supportBatchDelete;
+
+ MergeEngine(
+ String value,
+ String description,
+ boolean supportBatchUpdate,
+ boolean supportBatchDelete) {
this.value = value;
this.description = description;
+ this.supportBatchUpdate = supportBatchUpdate;
+ this.supportBatchDelete = supportBatchDelete;
}
@Override
@@ -1730,6 +1738,26 @@ public class CoreOptions implements Serializable {
public InlineElement getDescription() {
return text(description);
}
+
+ public boolean supportBatchUpdate() {
+ return supportBatchUpdate;
+ }
+
+ public boolean supportBatchDelete() {
+ return supportBatchDelete;
+ }
+
+ public static List<MergeEngine> supportBatchUpdateEngines() {
+ return Arrays.stream(MergeEngine.values())
+ .filter(MergeEngine::supportBatchUpdate)
+ .collect(Collectors.toList());
+ }
+
+ public static List<MergeEngine> supportBatchDeleteEngines() {
+ return Arrays.stream(MergeEngine.values())
+ .filter(MergeEngine::supportBatchDelete)
+ .collect(Collectors.toList());
+ }
}
/** Specifies the startup mode for log consumer. */
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index 2fdb658ad..ad3b18ccb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -18,6 +18,9 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.utils.Preconditions;
+
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.data.GenericRowData;
@@ -51,6 +54,17 @@ public class DeleteAction extends TableActionBase {
@Override
public void run() throws Exception {
+ CoreOptions.MergeEngine mergeEngine =
CoreOptions.fromMap(table.options()).mergeEngine();
+ Preconditions.checkArgument(mergeEngine.supportBatchDelete(), "");
+
+ if (!mergeEngine.supportBatchDelete()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Delete is executed in batch mode, but merge
engine %s can not support batch delete."
+ + " Support batch delete merge engines
are: %s.",
+ mergeEngine,
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+ }
+
LOG.debug("Run delete action with filter '{}'.", filter);
Table queriedTable =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 4ec5e6ef5..ba232b019 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
@@ -96,11 +97,11 @@ public class MergeIntoAction extends TableActionBase {
private String mergeCondition;
// actions to be taken
- boolean matchedUpsert;
- boolean notMatchedUpsert;
- boolean matchedDelete;
- boolean notMatchedDelete;
- boolean insert;
+ private boolean matchedUpsert;
+ private boolean notMatchedUpsert;
+ private boolean matchedDelete;
+ private boolean notMatchedDelete;
+ private boolean insert;
// upsert
@Nullable String matchedUpsertCondition;
@@ -215,6 +216,49 @@ public class MergeIntoAction extends TableActionBase {
return this;
}
+ public void validate() {
+ if (!matchedUpsert && !notMatchedUpsert && !matchedDelete &&
!notMatchedDelete && !insert) {
+ throw new IllegalArgumentException(
+ "Must specify at least one merge action. Run 'merge_into
--help' for help.");
+ }
+
+ CoreOptions.MergeEngine mergeEngine =
CoreOptions.fromMap(table.options()).mergeEngine();
+ if ((matchedUpsert || notMatchedUpsert) &&
!mergeEngine.supportBatchUpdate()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "merge-into is executed in batch mode, and you
have set matched_upsert or not_matched_by_source_upsert."
+ + " But merge engine %s can not support
batch update. Support batch update merge engines are: %s.",
+ mergeEngine,
CoreOptions.MergeEngine.supportBatchUpdateEngines()));
+ }
+
+ if ((matchedDelete || notMatchedDelete) &&
!mergeEngine.supportBatchDelete()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "merge-into is executed in batch mode, and you
have set matched_delete or not_matched_by_source_delete."
+ + " But merge engine %s can not support
batch delete. Support batch delete merge engines are: %s.",
+ mergeEngine,
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+ }
+
+ if ((matchedUpsert && matchedDelete)
+ && (matchedUpsertCondition == null || matchedDeleteCondition
== null)) {
+ throw new IllegalArgumentException(
+ "If both matched-upsert and matched-delete actions are
present, their conditions must both be present too.");
+ }
+
+ if ((notMatchedUpsert && notMatchedDelete)
+ && (notMatchedBySourceUpsertCondition == null
+ || notMatchedBySourceDeleteCondition == null)) {
+ throw new IllegalArgumentException(
+ "If both not-matched-by-source-upsert and
not-matched-by--source-delete actions are present, "
+ + "their conditions must both be present too.\n");
+ }
+
+ if (notMatchedBySourceUpsertSet != null &&
notMatchedBySourceUpsertSet.equals("*")) {
+ throw new IllegalArgumentException(
+ "The '*' cannot be used in
not_matched_by_source_upsert_set");
+ }
+ }
+
@Override
public void run() throws Exception {
DataStream<RowData> dataStream = buildDataStream();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
index 3c07e3bd5..0ff0db327 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
@@ -112,7 +112,7 @@ public class MergeIntoActionFactory implements
ActionFactory {
params.get(NOT_MATCHED_INSERT_VALUES));
}
- validate(action);
+ action.validate();
return Optional.of(action);
}
@@ -202,36 +202,4 @@ public class MergeIntoActionFactory implements
ActionFactory {
System.out.println(
" It will find matched rows of target table that meet
condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
}
-
- public static void validate(MergeIntoAction action) {
- if (!action.matchedUpsert
- && !action.notMatchedUpsert
- && !action.matchedDelete
- && !action.notMatchedDelete
- && !action.insert) {
- throw new IllegalArgumentException(
- "Must specify at least one merge action. Run 'merge_into
--help' for help.");
- }
-
- if ((action.matchedUpsert && action.matchedDelete)
- && (action.matchedUpsertCondition == null
- || action.matchedDeleteCondition == null)) {
- throw new IllegalArgumentException(
- "If both matched-upsert and matched-delete actions are
present, their conditions must both be present too.");
- }
-
- if ((action.notMatchedUpsert && action.notMatchedDelete)
- && (action.notMatchedBySourceUpsertCondition == null
- || action.notMatchedBySourceDeleteCondition == null)) {
- throw new IllegalArgumentException(
- "If both not-matched-by-source-upsert and
not-matched-by--source-delete actions are present, "
- + "their conditions must both be present too.\n");
- }
-
- if (action.notMatchedBySourceUpsertSet != null
- && action.notMatchedBySourceUpsertSet.equals("*")) {
- throw new IllegalArgumentException(
- "The '*' cannot be used in
not_matched_by_source_upsert_set");
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
index 450646803..c4236ac7d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.MergeIntoAction;
-import org.apache.paimon.flink.action.MergeIntoActionFactory;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -222,7 +221,7 @@ public class MergeIntoProcedure extends ProcedureBase {
}
action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
- MergeIntoActionFactory.validate(action);
+ action.validate();
DataStream<RowData> dataStream = action.buildDataStream();
TableResult tableResult = action.batchSink(dataStream);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 0d2bd3962..5d8ed8c74 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
@@ -89,47 +90,45 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
// Since only UPDATE_AFTER type messages can be received at present,
// AppendOnlyFileStoreTable cannot correctly handle old data, so they
are marked as
// unsupported. Similarly, it is not allowed to update the primary key
column when updating
- // the column of PrimaryKeyFileStoreTable, because the old data cannot
be handled
- // correctly.
- if (table.primaryKeys().size() > 0) {
- Options options = Options.fromMap(table.options());
- Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
- updatedColumns.forEach(
- column -> {
- if (primaryKeys.contains(column.getName())) {
- String errMsg =
- String.format(
- "Updates to primary keys are not
supported, primaryKeys (%s), updatedColumns (%s)",
- primaryKeys,
- updatedColumns.stream()
- .map(Column::getName)
-
.collect(Collectors.toList()));
- throw new UnsupportedOperationException(errMsg);
- }
- });
- if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE
- || options.get(MERGE_ENGINE) ==
MergeEngine.PARTIAL_UPDATE) {
- // Even with partial-update we still need all columns. Because
the topology
- // structure is source -> cal -> constraintEnforcer -> sink,
in the
- // constraintEnforcer operator, the constraint check will be
performed according to
- // the index, not according to the column. So we can't return
only some columns,
- // which will cause problems like
ArrayIndexOutOfBoundsException.
- // TODO: return partial columns after FLINK-32001 is resolved.
- return new RowLevelUpdateInfo() {};
- }
- throw new UnsupportedOperationException(
- String.format(
- "%s can not support update, currently only %s of
%s and %s can support update.",
- options.get(MERGE_ENGINE),
- MERGE_ENGINE.key(),
- MergeEngine.DEDUPLICATE,
- MergeEngine.PARTIAL_UPDATE));
- } else {
+ // the column of PrimaryKeyFileStoreTable, because the old data cannot
be handled correctly.
+ if (table.primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"%s can not support update, because there is no
primary key.",
table.getClass().getName()));
}
+
+ Options options = Options.fromMap(table.options());
+ Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
+ updatedColumns.forEach(
+ column -> {
+ if (primaryKeys.contains(column.getName())) {
+ String errMsg =
+ String.format(
+ "Updates to primary keys are not
supported, primaryKeys (%s), updatedColumns (%s)",
+ primaryKeys,
+ updatedColumns.stream()
+ .map(Column::getName)
+ .collect(Collectors.toList()));
+ throw new UnsupportedOperationException(errMsg);
+ }
+ });
+
+ MergeEngine mergeEngine = options.get(MERGE_ENGINE);
+ if (!mergeEngine.supportBatchUpdate()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Merge engine %s can not support batch update.
Support batch update merge engines are: %s.",
+ mergeEngine,
CoreOptions.MergeEngine.supportBatchUpdateEngines()));
+ }
+
+ // Even with partial-update we still need all columns. Because the
topology
+ // structure is source -> cal -> constraintEnforcer -> sink, in the
+ // constraintEnforcer operator, the constraint check will be performed
according to
+ // the index, not according to the column. So we can't return only
some columns,
+ // which will cause problems like ArrayIndexOutOfBoundsException.
+ // TODO: return partial columns after FLINK-32001 is resolved.
+ return new RowLevelUpdateInfo() {};
}
@Override
@@ -177,21 +176,20 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
}
private void validateDeletable() {
- if (table.primaryKeys().size() > 0) {
- Options options = Options.fromMap(table.options());
- if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
- return;
- }
- throw new UnsupportedOperationException(
- String.format(
- "merge engine '%s' can not support delete,
currently only %s can support delete.",
- options.get(MERGE_ENGINE),
MergeEngine.DEDUPLICATE));
- } else {
+ if (table.primaryKeys().isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"table '%s' can not support delete, because there
is no primary key.",
table.getClass().getName()));
}
+
+ MergeEngine mergeEngine =
CoreOptions.fromMap(table.options()).mergeEngine();
+ if (!mergeEngine.supportBatchDelete()) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Merge engine %s can not support batch delete.
Support batch delete merge engines are: %s.",
+ mergeEngine,
CoreOptions.MergeEngine.supportBatchDeleteEngines()));
+ }
}
private boolean canPushDownDeleteFilter() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index df8e317d4..c2235c85b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1332,9 +1332,6 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
@ParameterizedTest
@EnumSource(CoreOptions.MergeEngine.class)
public void testUpdateWithPrimaryKey(CoreOptions.MergeEngine mergeEngine)
throws Exception {
- Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
- supportUpdateEngines.add(DEDUPLICATE);
- supportUpdateEngines.add(CoreOptions.MergeEngine.PARTIAL_UPDATE);
// Step1: define table schema
Map<String, String> options = new HashMap<>();
options.put(MERGE_ENGINE.key(), mergeEngine.toString());
@@ -1361,12 +1358,17 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
"(3, 'Euro', 119, '2022-01-02')");
// Step3: prepare expected data.
- String rowKind = mergeEngine == CoreOptions.MergeEngine.PARTIAL_UPDATE
? "+I" : "+U";
+ String rowKind = mergeEngine == FIRST_ROW ? "+I" : "+U";
List<Row> expectedRecords =
Arrays.asList(
// part = 2022-01-01
changelogRow("+I", 1L, "US Dollar", 114L,
"2022-01-01"),
- changelogRow(rowKind, 2L, "Yen", 1L, "2022-01-01"),
+ changelogRow(
+ rowKind,
+ 2L,
+ mergeEngine == FIRST_ROW ? "UNKNOWN" : "Yen",
+ mergeEngine == FIRST_ROW ? -1 : 1L,
+ "2022-01-01"),
changelogRow("+I", 3L, "Euro", 114L, "2022-01-01"),
// part = 2022-01-02
changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
@@ -1374,15 +1376,14 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
// Step4: prepare update statement
String updateStatement =
String.format(
- ""
- + "UPDATE %s "
+ "UPDATE %s "
+ "SET currency = 'Yen', "
+ "rate = 1 "
+ "WHERE currency = 'UNKNOWN' and dt =
'2022-01-01'",
table);
// Step5: execute update statement and verify result
- if (supportUpdateEngines.contains(mergeEngine)) {
+ if (mergeEngine.supportBatchUpdate()) {
bEnv.executeSql(updateStatement).await();
String querySql = String.format("SELECT * FROM %s", table);
testBatchRead(querySql, expectedRecords);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index 1926299ba..523033579 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -132,59 +132,6 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
changelogRow("+I", 12, "v_12", "insert", "02-29")));
}
- @Test
- public void testWorkWithPartialUpdate() throws Exception {
- // re-create target table with given producer
- sEnv.executeSql("DROP TABLE T");
- prepareTargetTable(CoreOptions.ChangelogProducer.LOOKUP);
-
- MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse,
database, "T");
- action.withSourceTable("S")
- .withMergeCondition("T.k = S.k AND T.dt = S.dt")
- .withMatchedUpsert(
- "T.v <> S.v AND S.v IS NOT NULL", "v = S.v,
last_action = 'matched_upsert'")
- .withMatchedDelete("S.v IS NULL")
- .withNotMatchedInsert(null, "S.k, S.v, 'insert', S.dt")
- .withNotMatchedBySourceUpsert(
- "dt < '02-28'", "v = v || '_nmu', last_action =
'not_matched_upsert'")
- .withNotMatchedBySourceDelete("dt >= '02-28'");
-
- // delete records are filtered
- validateActionRunResult(
- action.build(),
- Arrays.asList(
- changelogRow("+U", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("+U", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("+U", 7, "Seven", "matched_upsert",
"02-28"),
- changelogRow("+I", 8, "v_8", "insert", "02-29"),
- changelogRow("+I", 11, "v_11", "insert", "02-29"),
- changelogRow("+I", 12, "v_12", "insert", "02-29")),
- Arrays.asList(
- changelogRow("+I", 1, "v_1", "creation", "02-27"),
- changelogRow("+I", 2, "v_2_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("+I", 3, "v_3_nmu", "not_matched_upsert",
"02-27"),
- changelogRow("+I", 4, "v_4", "creation", "02-27"),
- changelogRow("+I", 5, "v_5", "creation", "02-28"),
- changelogRow("+I", 6, "v_6", "creation", "02-28"),
- changelogRow("+I", 7, "Seven", "matched_upsert",
"02-28"),
- changelogRow("+I", 8, "v_8", "creation", "02-28"),
- changelogRow("+I", 8, "v_8", "insert", "02-29"),
- changelogRow("+I", 9, "v_9", "creation", "02-28"),
- changelogRow("+I", 10, "v_10", "creation", "02-28"),
- changelogRow("+I", 11, "v_11", "insert", "02-29"),
- changelogRow("+I", 12, "v_12", "insert", "02-29")));
-
- // test partial update still works after action
- insertInto(
- "T",
- "(12, CAST (NULL AS STRING), '$', '02-29')",
- "(12, 'Test', CAST (NULL AS STRING), '02-29')");
-
- testBatchRead(
- "SELECT * FROM T WHERE k = 12",
- Collections.singletonList(changelogRow("+I", 12, "Test", "$",
"02-29")));
- }
-
@ParameterizedTest(name = "in-default = {0}")
@ValueSource(booleans = {true, false})
public void testTargetAlias(boolean inDefault) throws Exception {