liming30 commented on code in PR #1010:
URL: https://github.com/apache/incubator-paimon/pull/1010#discussion_r1184978666


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java:
##########
@@ -18,168 +18,110 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.LogChangelogMode;
 import org.apache.paimon.CoreOptions.MergeEngine;
-import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.flink.FlinkCatalog;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
-import org.apache.paimon.flink.log.LogSinkProvider;
+import org.apache.paimon.WriteMode;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
 import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
-import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.types.RowKind;
 
 import javax.annotation.Nullable;
 
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
 
 /** Table sink to create sink. */
-public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite, 
SupportsPartitioning {
+public class FlinkTableSink extends FlinkTableSinkBase implements 
SupportsRowLevelUpdate {
 
-    private final ObjectIdentifier tableIdentifier;
-    private final Table table;
-    private final DynamicTableFactory.Context context;
-    @Nullable private final LogStoreTableFactory logStoreTableFactory;
-
-    private Map<String, String> staticPartitions = new HashMap<>();
-    private boolean overwrite = false;
-    @Nullable private CatalogLock.Factory lockFactory;
+    private Map<Integer, RowData.FieldGetter> updatedColumns = 
Collections.emptyMap();
 
     public FlinkTableSink(
             ObjectIdentifier tableIdentifier,
             Table table,
             DynamicTableFactory.Context context,
-            @Nullable LogStoreTableFactory logStoreTableFactory) {
-        this.tableIdentifier = tableIdentifier;
-        this.table = table;
-        this.context = context;
-        this.logStoreTableFactory = logStoreTableFactory;
+            @Nullable LogStoreTableFactory logStoreTableFactory,
+            List<Column> columns) {
+        super(tableIdentifier, table, context, logStoreTableFactory, columns);
     }
 
     @Override
-    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-        if (table instanceof AppendOnlyFileStoreTable) {
-            // Don't check this, for example, only inserts are available from 
the database, but the
-            // plan phase contains all changelogs
-            return requestedMode;
-        } else if (table instanceof ChangelogValueCountFileStoreTable) {
-            // no primary key, sink all changelogs
-            return requestedMode;
-        } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+    public RowLevelUpdateInfo applyRowLevelUpdate(
+            List<Column> updatedColumns, @Nullable 
RowLevelModificationScanContext context) {
+        // Since only UPDATE_AFTER type messages can be received at present,
+        // AppendOnlyFileStoreTable and ChangelogValueCountFileStoreTable 
without primary keys
+        // cannot correctly handle old data, so they are marked as 
unsupported. Similarly, it is not
+        // allowed to update the primary key column when updating the column of
+        // ChangelogWithKeyFileStoreTable, because the old data cannot be 
handled correctly.
+        if (table instanceof ChangelogWithKeyFileStoreTable) {
             Options options = Options.fromMap(table.options());
-            if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
-                return requestedMode;
-            }
-
-            if (options.get(MERGE_ENGINE) == MergeEngine.AGGREGATE) {
-                return requestedMode;
-            }
-
-            if (options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL) {
-                return requestedMode;
-            }
-
-            // with primary key, default sink upsert
-            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
-            for (RowKind kind : requestedMode.getContainedKinds()) {
-                if (kind != RowKind.UPDATE_BEFORE) {
-                    builder.addContainedKind(kind);
+            Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
+            updatedColumns.forEach(
+                    column -> {
+                        if (primaryKeys.contains(column.getName())) {
+                            throw new UnsupportedOperationException(
+                                    "Updates to primary keys are not 
supported.");
+                        }
+                    });
+
+            if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
+                return new RowLevelUpdateInfo() {};
+            } else if (options.get(MERGE_ENGINE) == 
MergeEngine.PARTIAL_UPDATE) {
+                this.updatedColumns = new LinkedHashMap<>();
+                for (int i = 0; i < columns.size(); i++) {
+                    Column column = columns.get(i);
+                    if (primaryKeys.contains(column.getName()) || 
updatedColumns.contains(column)) {
+                        this.updatedColumns.put(
+                                i,
+                                RowData.createFieldGetter(
+                                        column.getDataType().getLogicalType(), 
i));
+                    }
                 }
+                // Even with partial-update we still need all columns. Because 
the topology
+                // structure is source -> cal -> ConstraintEnforcer -> sink, 
in the
+                // constraintEnforcer operator, the constraint check will be 
performed according to
+                // the index, not according to the column. So we can't return 
only some columns,
+                // which will cause problems like 
ArrayIndexOutOfBoundsException.
+                return new RowLevelUpdateInfo() {};

Review Comment:
   In the case of `partial-update`, we should only update the `primaryKeys ` 
and `updatedColumns`, and set the other columns to null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to