This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 37391e056 [flink] integrate UPDATE for flink-1.17  (#1010)
37391e056 is described below

commit 37391e056100fb939e1dcc8b272668bfe808cc71
Author: liming.1018 <[email protected]>
AuthorDate: Sat May 6 16:48:58 2023 +0800

    [flink] integrate UPDATE for flink-1.17  (#1010)
---
 .../apache/paimon/flink/sink/FlinkTableSink.java   |  39 +++++
 .../apache/paimon/flink/sink/FlinkTableSink.java   |  39 +++++
 paimon-flink/paimon-flink-1.16/pom.xml             |  12 ++
 .../apache/paimon/flink/sink/FlinkTableSink.java   |  39 +++++
 .../apache/paimon/flink/sink/FlinkTableSink.java   | 188 ++++++---------------
 ...FlinkTableSink.java => FlinkTableSinkBase.java} |  14 +-
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 111 ++++++++++++
 .../paimon/flink/util/ReadWriteTableTestUtil.java  |   3 +-
 8 files changed, 306 insertions(+), 139 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
new file mode 100644
index 000000000..054e48a20
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import javax.annotation.Nullable;
+
+/** Table sink to create sink. */
+public class FlinkTableSink extends FlinkTableSinkBase {
+
+    public FlinkTableSink(
+            ObjectIdentifier tableIdentifier,
+            Table table,
+            DynamicTableFactory.Context context,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        super(tableIdentifier, table, context, logStoreTableFactory);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
new file mode 100644
index 000000000..054e48a20
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import javax.annotation.Nullable;
+
+/** Table sink to create sink. */
+public class FlinkTableSink extends FlinkTableSinkBase {
+
+    public FlinkTableSink(
+            ObjectIdentifier tableIdentifier,
+            Table table,
+            DynamicTableFactory.Context context,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        super(tableIdentifier, table, context, logStoreTableFactory);
+    }
+}
diff --git a/paimon-flink/paimon-flink-1.16/pom.xml 
b/paimon-flink/paimon-flink-1.16/pom.xml
index 64e2f2fd5..a5fece0c8 100644
--- a/paimon-flink/paimon-flink-1.16/pom.xml
+++ b/paimon-flink/paimon-flink-1.16/pom.xml
@@ -42,6 +42,18 @@ under the License.
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-flink-common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <!-- test dependencies -->
diff --git 
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
new file mode 100644
index 000000000..054e48a20
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import javax.annotation.Nullable;
+
+/** Table sink to create sink. */
+public class FlinkTableSink extends FlinkTableSinkBase {
+
+    public FlinkTableSink(
+            ObjectIdentifier tableIdentifier,
+            Table table,
+            DynamicTableFactory.Context context,
+            @Nullable LogStoreTableFactory logStoreTableFactory) {
+        super(tableIdentifier, table, context, logStoreTableFactory);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 6b983265e..857f9ad98 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -18,168 +18,92 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.LogChangelogMode;
 import org.apache.paimon.CoreOptions.MergeEngine;
-import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.flink.FlinkCatalog;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
-import org.apache.paimon.flink.log.LogSinkProvider;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
 import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
-import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 
 /** Table sink to create sink. */
-public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite, 
SupportsPartitioning {
-
-    private final ObjectIdentifier tableIdentifier;
-    private final Table table;
-    private final DynamicTableFactory.Context context;
-    @Nullable private final LogStoreTableFactory logStoreTableFactory;
-
-    private Map<String, String> staticPartitions = new HashMap<>();
-    private boolean overwrite = false;
-    @Nullable private CatalogLock.Factory lockFactory;
+public class FlinkTableSink extends FlinkTableSinkBase implements 
SupportsRowLevelUpdate {
 
     public FlinkTableSink(
             ObjectIdentifier tableIdentifier,
             Table table,
             DynamicTableFactory.Context context,
             @Nullable LogStoreTableFactory logStoreTableFactory) {
-        this.tableIdentifier = tableIdentifier;
-        this.table = table;
-        this.context = context;
-        this.logStoreTableFactory = logStoreTableFactory;
+        super(tableIdentifier, table, context, logStoreTableFactory);
     }
 
     @Override
-    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-        if (table instanceof AppendOnlyFileStoreTable) {
-            // Don't check this, for example, only inserts are available from 
the database, but the
-            // plan phase contains all changelogs
-            return requestedMode;
-        } else if (table instanceof ChangelogValueCountFileStoreTable) {
-            // no primary key, sink all changelogs
-            return requestedMode;
-        } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+    public RowLevelUpdateInfo applyRowLevelUpdate(
+            List<Column> updatedColumns, @Nullable 
RowLevelModificationScanContext context) {
+        // Since only UPDATE_AFTER type messages can be received at present,
+        // AppendOnlyFileStoreTable and ChangelogValueCountFileStoreTable 
without primary keys
+        // cannot correctly handle old data, so they are marked as 
unsupported. Similarly, it is not
+        // allowed to update the primary key column when updating the column of
+        // ChangelogWithKeyFileStoreTable, because the old data cannot be 
handled correctly.
+        if (table instanceof ChangelogWithKeyFileStoreTable) {
             Options options = Options.fromMap(table.options());
-            if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
-                return requestedMode;
-            }
-
-            if (options.get(MERGE_ENGINE) == MergeEngine.AGGREGATE) {
-                return requestedMode;
-            }
-
-            if (options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL) {
-                return requestedMode;
-            }
-
-            // with primary key, default sink upsert
-            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
-            for (RowKind kind : requestedMode.getContainedKinds()) {
-                if (kind != RowKind.UPDATE_BEFORE) {
-                    builder.addContainedKind(kind);
-                }
+            Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
+            updatedColumns.forEach(
+                    column -> {
+                        if (primaryKeys.contains(column.getName())) {
+                            String errMsg =
+                                    String.format(
+                                            "Updates to primary keys are not 
supported, primaryKeys (%s), updatedColumns (%s)",
+                                            primaryKeys,
+                                            updatedColumns.stream()
+                                                    .map(Column::getName)
+                                                    
.collect(Collectors.toList()));
+                            throw new UnsupportedOperationException(errMsg);
+                        }
+                    });
+            if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE
+                    || options.get(MERGE_ENGINE) == 
MergeEngine.PARTIAL_UPDATE) {
+                // Even with partial-update we still need all columns. Because 
the topology
+                // structure is source -> cal -> constraintEnforcer -> sink, 
in the
+                // constraintEnforcer operator, the constraint check will be 
performed according to
+                // the index, not according to the column. So we can't return 
only some columns,
+                // which will cause problems like 
ArrayIndexOutOfBoundsException.
+                // TODO: return partial columns after FLINK-32001 is resolved.
+                return new RowLevelUpdateInfo() {};
             }
-            return builder.build();
-        } else {
             throw new UnsupportedOperationException(
-                    "Unknown FileStoreTable subclass " + 
table.getClass().getName());
-        }
-    }
-
-    @Override
-    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-        if (overwrite && !context.isBounded()) {
+                    String.format(
+                            "%s can not support update, currently only %s of 
%s and %s can support update.",
+                            options.get(MERGE_ENGINE),
+                            MERGE_ENGINE.key(),
+                            MergeEngine.DEDUPLICATE,
+                            MergeEngine.PARTIAL_UPDATE));
+        } else if (table instanceof AppendOnlyFileStoreTable
+                || table instanceof ChangelogValueCountFileStoreTable) {
             throw new UnsupportedOperationException(
-                    "Paimon doesn't support streaming INSERT OVERWRITE.");
-        }
-
-        LogSinkProvider logSinkProvider = null;
-        if (logStoreTableFactory != null) {
-            logSinkProvider = 
logStoreTableFactory.createSinkProvider(this.context, context);
+                    String.format(
+                            "%s can not support update, because there is no 
primary key.",
+                            table.getClass().getName()));
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "%s can not support update, because it is an 
unknown subclass of FileStoreTable.",
+                            table.getClass().getName()));
         }
-
-        Options conf = Options.fromMap(table.options());
-        // Do not sink to log store when overwrite mode
-        final LogSinkFunction logSinkFunction =
-                overwrite ? null : (logSinkProvider == null ? null : 
logSinkProvider.createSink());
-        return new PaimonDataStreamSinkProvider(
-                (dataStream) ->
-                        new FlinkSinkBuilder((FileStoreTable) table)
-                                .withInput(
-                                        new DataStream<>(
-                                                
dataStream.getExecutionEnvironment(),
-                                                
dataStream.getTransformation()))
-                                .withLockFactory(
-                                        Lock.factory(
-                                                lockFactory,
-                                                FlinkCatalog.toIdentifier(
-                                                        
tableIdentifier.toObjectPath())))
-                                .withLogSinkFunction(logSinkFunction)
-                                .withOverwritePartition(overwrite ? 
staticPartitions : null)
-                                
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
-                                .build());
-    }
-
-    @Override
-    public DynamicTableSink copy() {
-        FlinkTableSink copied =
-                new FlinkTableSink(tableIdentifier, table, context, 
logStoreTableFactory);
-        copied.staticPartitions = new HashMap<>(staticPartitions);
-        copied.overwrite = overwrite;
-        copied.lockFactory = lockFactory;
-        return copied;
-    }
-
-    @Override
-    public String asSummaryString() {
-        return "PaimonSink";
-    }
-
-    @Override
-    public void applyStaticPartition(Map<String, String> partition) {
-        table.partitionKeys()
-                .forEach(
-                        partitionKey -> {
-                            if (partition.containsKey(partitionKey)) {
-                                this.staticPartitions.put(
-                                        partitionKey, 
partition.get(partitionKey));
-                            }
-                        });
-    }
-
-    @Override
-    public void applyOverwrite(boolean overwrite) {
-        this.overwrite = overwrite;
-    }
-
-    public void setLockFactory(@Nullable CatalogLock.Factory lockFactory) {
-        this.lockFactory = lockFactory;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
similarity index 95%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index 6b983265e..286cc484a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -54,18 +54,20 @@ import static 
org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 
 /** Table sink to create sink. */
-public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite, 
SupportsPartitioning {
+public abstract class FlinkTableSinkBase
+        implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
 
     private final ObjectIdentifier tableIdentifier;
-    private final Table table;
     private final DynamicTableFactory.Context context;
     @Nullable private final LogStoreTableFactory logStoreTableFactory;
 
-    private Map<String, String> staticPartitions = new HashMap<>();
-    private boolean overwrite = false;
-    @Nullable private CatalogLock.Factory lockFactory;
+    protected final Table table;
 
-    public FlinkTableSink(
+    protected Map<String, String> staticPartitions = new HashMap<>();
+    protected boolean overwrite = false;
+    @Nullable protected CatalogLock.Factory lockFactory;
+
+    public FlinkTableSinkBase(
             ObjectIdentifier tableIdentifier,
             Table table,
             DynamicTableFactory.Context context,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 37ad4a1e3..238c5012f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -54,19 +54,24 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static org.apache.paimon.CoreOptions.WRITE_MODE;
@@ -1459,6 +1464,112 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         validateSchemaOptionResult();
     }
 
+    // 
----------------------------------------------------------------------------------------------------------------
+    // Update statement
+    // 
----------------------------------------------------------------------------------------------------------------
+
+    @ParameterizedTest
+    @EnumSource(CoreOptions.MergeEngine.class)
+    public void testUpdateWithPrimaryKey(CoreOptions.MergeEngine mergeEngine) 
throws Exception {
+        Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
+        supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
+        supportUpdateEngines.add(CoreOptions.MergeEngine.PARTIAL_UPDATE);
+        // Step1: define table schema
+        Map<String, String> options = new HashMap<>();
+        options.put(WRITE_MODE.key(), WriteMode.CHANGE_LOG.toString());
+        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        String table =
+                createTable(
+                        Arrays.asList(
+                                "id BIGINT NOT NULL",
+                                "currency STRING",
+                                "rate BIGINT",
+                                "dt String"),
+                        Arrays.asList("id", "dt"),
+                        Collections.singletonList("dt"),
+                        options);
+
+        // Step2: batch write some historical data
+        insertInto(
+                table,
+                "(1, 'US Dollar', 114, '2022-01-01')",
+                "(2, 'UNKNOWN', -1, '2022-01-01')",
+                "(3, 'Euro', 114, '2022-01-01')",
+                "(3, 'Euro', 119, '2022-01-02')");
+
+        // Step3: prepare expected data.
+        String rowKind = mergeEngine == CoreOptions.MergeEngine.PARTIAL_UPDATE 
? "+I" : "+U";
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", 1L, "US Dollar", 114L, 
"2022-01-01"),
+                        changelogRow(rowKind, 2L, "Yen", 1L, "2022-01-01"),
+                        changelogRow("+I", 3L, "Euro", 114L, "2022-01-01"),
+                        // part = 2022-01-02
+                        changelogRow("+I", 3L, "Euro", 119L, "2022-01-02"));
+
+        // Step4: prepare update statement
+        String updateStatement =
+                String.format(
+                        ""
+                                + "UPDATE %s "
+                                + "SET currency = 'Yen', "
+                                + "rate = 1 "
+                                + "WHERE currency = 'UNKNOWN' and dt = 
'2022-01-01'",
+                        table);
+
+        // Step5: execute update statement and verify result
+        if (supportUpdateEngines.contains(mergeEngine)) {
+            bEnv.executeSql(updateStatement).await();
+            String querySql = String.format("SELECT * FROM %s", table);
+            testBatchRead(querySql, expectedRecords);
+        } else {
+            assertThatThrownBy(() -> bEnv.executeSql(updateStatement).await())
+                    
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(WriteMode.class)
+    public void testUpdateWithoutPrimaryKey(WriteMode writeMode) throws 
Exception {
+        // Step1: define table schema
+        Map<String, String> options = new HashMap<>();
+        options.put(WRITE_MODE.key(), writeMode.toString());
+        options.put(MERGE_ENGINE.key(), 
MERGE_ENGINE.defaultValue().toString());
+        String table =
+                createTable(
+                        Arrays.asList(
+                                "id BIGINT NOT NULL",
+                                "currency STRING",
+                                "rate BIGINT",
+                                "dt String"),
+                        Collections.emptyList(),
+                        Collections.singletonList("dt"),
+                        options);
+
+        // Step2: batch write some historical data
+        insertInto(
+                table,
+                "(1, 'US Dollar', 114, '2022-01-01')",
+                "(2, 'UNKNOWN', -1, '2022-01-01')",
+                "(3, 'Euro', 114, '2022-01-01')",
+                "(3, 'Euro', 119, '2022-01-02')");
+
+        // Step3: prepare update statement
+        String updateStatement =
+                String.format(
+                        ""
+                                + "UPDATE %s "
+                                + "SET currency = 'Yen', "
+                                + "rate = 1 "
+                                + "WHERE currency = 'UNKNOWN' and dt = 
'2022-01-01'",
+                        table);
+
+        // Step4: execute update statement
+        assertThatThrownBy(() -> bEnv.executeSql(updateStatement).await())
+                
.satisfies(AssertionUtils.anyCauseMatches(UnsupportedOperationException.class));
+    }
+
     // 
----------------------------------------------------------------------------------------------------------------
     // Tools
     // 
----------------------------------------------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 5c4908aaf..b87ca1277 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -129,7 +129,8 @@ public class ReadWriteTableTestUtil {
             List<String> primaryKeys,
             List<String> partitionKeys,
             Map<String, String> options) {
-        String table = "MyTable_" + UUID.randomUUID();
+        // "-" is not allowed in the table name.
+        String table = ("MyTable_" + UUID.randomUUID()).replace("-", "_");
         sEnv.executeSql(buildDdl(table, fieldsSpec, primaryKeys, 
partitionKeys, options));
         return table;
     }

Reply via email to