JingsongLi commented on code in PR #1010:
URL: https://github.com/apache/incubator-paimon/pull/1010#discussion_r1184594340
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java:
##########
@@ -18,168 +18,110 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.CoreOptions.ChangelogProducer;
-import org.apache.paimon.CoreOptions.LogChangelogMode;
import org.apache.paimon.CoreOptions.MergeEngine;
-import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.flink.FlinkCatalog;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
-import org.apache.paimon.flink.log.LogSinkProvider;
+import org.apache.paimon.WriteMode;
import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
-import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.types.RowKind;
import javax.annotation.Nullable;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
-import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
-import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.WRITE_MODE;
/** Table sink to create sink. */
-public class FlinkTableSink implements DynamicTableSink, SupportsOverwrite,
SupportsPartitioning {
+public class FlinkTableSink extends FlinkTableSinkBase implements
SupportsRowLevelUpdate {
- private final ObjectIdentifier tableIdentifier;
- private final Table table;
- private final DynamicTableFactory.Context context;
- @Nullable private final LogStoreTableFactory logStoreTableFactory;
-
- private Map<String, String> staticPartitions = new HashMap<>();
- private boolean overwrite = false;
- @Nullable private CatalogLock.Factory lockFactory;
+ private Map<Integer, RowData.FieldGetter> updatedColumns =
Collections.emptyMap();
public FlinkTableSink(
ObjectIdentifier tableIdentifier,
Table table,
DynamicTableFactory.Context context,
- @Nullable LogStoreTableFactory logStoreTableFactory) {
- this.tableIdentifier = tableIdentifier;
- this.table = table;
- this.context = context;
- this.logStoreTableFactory = logStoreTableFactory;
+ @Nullable LogStoreTableFactory logStoreTableFactory,
+ List<Column> columns) {
Review Comment:
Can we get type from `Table.rowType`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]