This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 37391e056 [flink] integrate UPDATE for flink-1.17 (#1010)
37391e056 is described below
commit 37391e056100fb939e1dcc8b272668bfe808cc71
Author: liming.1018 <[email protected]>
AuthorDate: Sat May 6 16:48:58 2023 +0800
[flink] integrate UPDATE for flink-1.17 (#1010)
---
.../apache/paimon/flink/sink/FlinkTableSink.java | 39 +++++
.../apache/paimon/flink/sink/FlinkTableSink.java | 39 +++++
paimon-flink/paimon-flink-1.16/pom.xml | 12 ++
.../apache/paimon/flink/sink/FlinkTableSink.java | 39 +++++
.../apache/paimon/flink/sink/FlinkTableSink.java | 188 ++++++---------------
...FlinkTableSink.java => FlinkTableSinkBase.java} | 14 +-
.../apache/paimon/flink/ReadWriteTableITCase.java | 111 ++++++++++++
.../paimon/flink/util/ReadWriteTableTestUtil.java | 3 +-
8 files changed, 306 insertions(+), 139 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
new file mode 100644
index 000000000..054e48a20
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import javax.annotation.Nullable;
+
+/** Table sink to create sink. */
+public class FlinkTableSink extends FlinkTableSinkBase {
+
+ public FlinkTableSink(
+ ObjectIdentifier tableIdentifier,
+ Table table,
+ DynamicTableFactory.Context context,
+ @Nullable LogStoreTableFactory logStoreTableFactory) {
+ super(tableIdentifier, table, context, logStoreTableFactory);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
new file mode 100644
index 000000000..054e48a20
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import javax.annotation.Nullable;
+
+/** Table sink to create sink. */
+public class FlinkTableSink extends FlinkTableSinkBase {
+
+ public FlinkTableSink(
+ ObjectIdentifier tableIdentifier,
+ Table table,
+ DynamicTableFactory.Context context,
+ @Nullable LogStoreTableFactory logStoreTableFactory) {
+ super(tableIdentifier, table, context, logStoreTableFactory);
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.16/pom.xml
b/paimon-flink/paimon-flink-1.16/pom.xml
index 64e2f2fd5..a5fece0c8 100644
--- a/paimon-flink/paimon-flink-1.16/pom.xml
+++ b/paimon-flink/paimon-flink-1.16/pom.xml
@@ -42,6 +42,18 @@ under the License.
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
</dependency>
<!-- test dependencies -->
diff --git
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
new file mode 100644
index 000000000..054e48a20
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import javax.annotation.Nullable;
+
+/** Table sink to create sink. */
+public class FlinkTableSink extends FlinkTableSinkBase {
+
+ public FlinkTableSink(
+ ObjectIdentifier tableIdentifier,
+ Table table,
+ DynamicTableFactory.Context context,
+ @Nullable LogStoreTableFactory logStoreTableFactory) {
+ super(tableIdentifier, table, context, logStoreTableFactory);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 6b983265e..857f9ad98 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -18,168 +18,92 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.MergeEngine;
-import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.flink.FlinkCatalog;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
-import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
-import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
/** Table sink to create sink. */
-public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite,
SupportsPartitioning {
-
- private final ObjectIdentifier tableIdentifier;
- private final Table table;
- private final DynamicTableFactory.Context context;
- @Nullable private final LogStoreTableFactory logStoreTableFactory;
-
- private Map<String, String> staticPartitions = new HashMap<>();
- private boolean overwrite = false;
- @Nullable private CatalogLock.Factory lockFactory;
+public class FlinkTableSink extends FlinkTableSinkBase implements
SupportsRowLevelUpdate {
public FlinkTableSink(
ObjectIdentifier tableIdentifier,
Table table,
DynamicTableFactory.Context context,
@Nullable LogStoreTableFactory logStoreTableFactory) {
- this.tableIdentifier = tableIdentifier;
- this.table = table;
- this.context = context;
- this.logStoreTableFactory = logStoreTableFactory;
+ super(tableIdentifier, table, context, logStoreTableFactory);
}
@Override
- public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
- if (table instanceof AppendOnlyFileStoreTable) {
- // Don't check this, for example, only inserts are available from
the database, but the
- // plan phase contains all changelogs
- return requestedMode;
- } else if (table instanceof ChangelogValueCountFileStoreTable) {
- // no primary key, sink all changelogs
- return requestedMode;
- } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+ public RowLevelUpdateInfo applyRowLevelUpdate(
+ List<Column> updatedColumns, @Nullable
RowLevelModificationScanContext context) {
+ // Since only UPDATE_AFTER type messages can be received at present,
+ // AppendOnlyFileStoreTable and ChangelogValueCountFileStoreTable
without primary keys
+ // cannot correctly handle old data, so they are marked as
unsupported. Similarly, it is not
+ // allowed to update the primary key column when updating the column of
+ // ChangelogWithKeyFileStoreTable, because the old data cannot be
handled correctly.
+ if (table instanceof ChangelogWithKeyFileStoreTable) {
Options options = Options.fromMap(table.options());
- if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
- return requestedMode;
- }
-
- if (options.get(MERGE_ENGINE) == MergeEngine.AGGREGATE) {
- return requestedMode;
- }
-
- if (options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL) {
- return requestedMode;
- }
-
- // with primary key, default sink upsert
- ChangelogMode.Builder builder = ChangelogMode.newBuilder();
- for (RowKind kind : requestedMode.getContainedKinds()) {
- if (kind != RowKind.UPDATE_BEFORE) {
- builder.addContainedKind(kind);
- }
+ Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
+ updatedColumns.forEach(
+ column -> {
+ if (primaryKeys.contains(column.getName())) {
+ String errMsg =
+ String.format(
+ "Updates to primary keys are not
supported, primaryKeys (%s), updatedColumns (%s)",
+ primaryKeys,
+ updatedColumns.stream()
+ .map(Column::getName)
+
.collect(Collectors.toList()));
+ throw new UnsupportedOperationException(errMsg);
+ }
+ });
+ if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE
+ || options.get(MERGE_ENGINE) ==
MergeEngine.PARTIAL_UPDATE) {
+ // Even with partial-update we still need all columns. Because
the topology
+ // structure is source -> cal -> constraintEnforcer -> sink,
in the
+ // constraintEnforcer operator, the constraint check will be
performed according to
+ // the index, not according to the column. So we can't return
only some columns,
+ // which will cause problems like
ArrayIndexOutOfBoundsException.
+ // TODO: return partial columns after FLINK-32001 is resolved.
+ return new RowLevelUpdateInfo() {};
}
- return builder.build();
- } else {
throw new UnsupportedOperationException(
- "Unknown FileStoreTable subclass " +
table.getClass().getName());
- }
- }
-
- @Override
- public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
- if (overwrite && !context.isBounded()) {
+ String.format(
+ "%s can not support update, currently only %s of
%s and %s can support update.",
+ options.get(MERGE_ENGINE),
+ MERGE_ENGINE.key(),
+ MergeEngine.DEDUPLICATE,
+ MergeEngine.PARTIAL_UPDATE));
+ } else if (table instanceof AppendOnlyFileStoreTable
+ || table instanceof ChangelogValueCountFileStoreTable) {
throw new UnsupportedOperationException(
- "Paimon doesn't support streaming INSERT OVERWRITE.");
- }
-
- LogSinkProvider logSinkProvider = null;
- if (logStoreTableFactory != null) {
- logSinkProvider =
logStoreTableFactory.createSinkProvider(this.context, context);
+ String.format(
+ "%s can not support update, because there is no
primary key.",
+ table.getClass().getName()));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "%s can not support update, because it is an
unknown subclass of FileStoreTable.",
+ table.getClass().getName()));
}
-
- Options conf = Options.fromMap(table.options());
- // Do not sink to log store when overwrite mode
- final LogSinkFunction logSinkFunction =
- overwrite ? null : (logSinkProvider == null ? null :
logSinkProvider.createSink());
- return new PaimonDataStreamSinkProvider(
- (dataStream) ->
- new FlinkSinkBuilder((FileStoreTable) table)
- .withInput(
- new DataStream<>(
-
dataStream.getExecutionEnvironment(),
-
dataStream.getTransformation()))
- .withLockFactory(
- Lock.factory(
- lockFactory,
- FlinkCatalog.toIdentifier(
-
tableIdentifier.toObjectPath())))
- .withLogSinkFunction(logSinkFunction)
- .withOverwritePartition(overwrite ?
staticPartitions : null)
-
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
- .build());
- }
-
- @Override
- public DynamicTableSink copy() {
- FlinkTableSink copied =
- new FlinkTableSink(tableIdentifier, table, context,
logStoreTableFactory);
- copied.staticPartitions = new HashMap<>(staticPartitions);
- copied.overwrite = overwrite;
- copied.lockFactory = lockFactory;
- return copied;
- }
-
- @Override
- public String asSummaryString() {
- return "PaimonSink";
- }
-
- @Override
- public void applyStaticPartition(Map<String, String> partition) {
- table.partitionKeys()
- .forEach(
- partitionKey -> {
- if (partition.containsKey(partitionKey)) {
- this.staticPartitions.put(
- partitionKey,
partition.get(partitionKey));
- }
- });
- }
-
- @Override
- public void applyOverwrite(boolean overwrite) {
- this.overwrite = overwrite;
- }
-
- public void setLockFactory(@Nullable CatalogLock.Factory lockFactory) {
- this.lockFactory = lockFactory;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
similarity index 95%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index 6b983265e..286cc484a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -54,18 +54,20 @@ import static
org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
/** Table sink to create sink. */
-public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite,
SupportsPartitioning {
+public abstract class FlinkTableSinkBase
+ implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
private final ObjectIdentifier tableIdentifier;
- private final Table table;
private final DynamicTableFactory.Context context;
@Nullable private final LogStoreTableFactory logStoreTableFactory;
- private Map<String, String> staticPartitions = new HashMap<>();
- private boolean overwrite = false;
- @Nullable private CatalogLock.Factory lockFactory;
+ protected final Table table;
- public FlinkTableSink(
+ protected Map<String, String> staticPartitions = new HashMap<>();
+ protected boolean overwrite = false;
+ @Nullable protected CatalogLock.Factory lockFactory;
+
+ public FlinkTableSinkBase(
ObjectIdentifier tableIdentifier,
Table table,
DynamicTableFactory.Context context,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 37ad4a1e3..238c5012f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -54,19 +54,24 @@ import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static org.apache.paimon.CoreOptions.WRITE_MODE;
@@ -1459,6 +1464,112 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
validateSchemaOptionResult();
}
+ //
----------------------------------------------------------------------------------------------------------------
+ // Update statement
+ //
----------------------------------------------------------------------------------------------------------------
+
+ @ParameterizedTest
+ @EnumSource(CoreOptions.MergeEngine.class)
+ public void testUpdateWithPrimaryKey(CoreOptions.MergeEngine mergeEngine)
throws Exception {
+ Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
+ supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
+ supportUpdateEngines.add(CoreOptions.MergeEngine.PARTIAL_UPDATE);
+ // Step1: define table schema
+ Map<String, String> options = new HashMap<>();
+ options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
+ options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+ String table =
+ createTable(
+ Arrays.asList(
+ "id BIGINT NOT NULL",
+ "currency STRING",
+ "rate BIGINT",
+ "dt String"),
+ Arrays.asList("id", "dt"),
+ Collections.singletonList("dt"),
+ options);
+
+ // Step2: batch write some historical data
+ insertInto(
+ table,
+ "(1, 'US Dollar', 114, '2022-01-01')",
+ "(2, 'UNKNOWN', -1, '2022-01-01')",
+ "(3, 'Euro', 114, '2022-01-01')",
+ "(3, 'Euro', 119, '2022-01-02')");
+
+ // Step3: prepare expected data.
+ String rowKind = mergeEngine == CoreOptions.MergeEngine.PARTIAL_UPDATE
? "+I" : "+U";
+ List<Row> expectedRecords =
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", 1L, "US Dollar", 114L,
"2022-01-01"),
+ changelogRow(rowKind, 2L, "Yen", 1L, "2022-01-01"),
+ changelogRow("+I", 3L, "Euro", 114L, "2022-01-01"),
+ // part = 2022-01-02
+ changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
+
+ // Step4: prepare update statement
+ String updateStatement =
+ String.format(
+ ""
+ + "UPDATE %s "
+ + "SET currency = 'Yen', "
+ + "rate = 1 "
+ + "WHERE currency = 'UNKNOWN' and dt =
'2022-01-01'",
+ table);
+
+ // Step5: execute update statement and verify result
+ if (supportUpdateEngines.contains(mergeEngine)) {
+ bEnv.executeSql(updateStatement).await();
+ String querySql = String.format("SELECT * FROM %s", table);
+ testBatchRead(querySql, expectedRecords);
+ } else {
+ assertThatThrownBy(() -> bEnv.executeSql(updateStatement).await())
+
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(WriteMode.class)
+ public void testUpdateWithoutPrimaryKey(WriteMode writeMode) throws
Exception {
+ // Step1: define table schema
+ Map<String, String> options = new HashMap<>();
+ options.put(WRITE_MODE.key(), writeMode.toString());
+ options.put(MERGE_ENGINE.key(),
MERGE_ENGINE.defaultValue().toString());
+ String table =
+ createTable(
+ Arrays.asList(
+ "id BIGINT NOT NULL",
+ "currency STRING",
+ "rate BIGINT",
+ "dt String"),
+ Collections.emptyList(),
+ Collections.singletonList("dt"),
+ options);
+
+ // Step2: batch write some historical data
+ insertInto(
+ table,
+ "(1, 'US Dollar', 114, '2022-01-01')",
+ "(2, 'UNKNOWN', -1, '2022-01-01')",
+ "(3, 'Euro', 114, '2022-01-01')",
+ "(3, 'Euro', 119, '2022-01-02')");
+
+ // Step3: prepare update statement
+ String updateStatement =
+ String.format(
+ ""
+ + "UPDATE %s "
+ + "SET currency = 'Yen', "
+ + "rate = 1 "
+ + "WHERE currency = 'UNKNOWN' and dt =
'2022-01-01'",
+ table);
+
+ // Step4: execute update statement
+ assertThatThrownBy(() -> bEnv.executeSql(updateStatement).await())
+
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
+ }
+
//
----------------------------------------------------------------------------------------------------------------
// Tools
//
----------------------------------------------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 5c4908aaf..b87ca1277 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -129,7 +129,8 @@ public class ReadWriteTableTestUtil {
List<String> primaryKeys,
List<String> partitionKeys,
Map<String, String> options) {
- String table = "MyTable_" + UUID.randomUUID();
+ // "-" is not allowed in the table name.
+ String table = ("MyTable_" + UUID.randomUUID()).replace("-", "_");
sEnv.executeSql(buildDdl(table, fieldsSpec, primaryKeys,
partitionKeys, options));
return table;
}