AHeise commented on a change in pull request #16796:
URL: https://github.com/apache/flink/pull/16796#discussion_r687981689



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {
+
+    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final ScheduledExecutorService scheduler =
+            Executors.newScheduledThreadPool(
+                    1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+    private final ScheduledFuture<?> scheduledFuture;
+    private final int batchMaxRowNums;
+    private final Function<RowData, RowData> valueCopier;
+    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new 
HashMap<>();
+    private final Function<RowData, RowData> keyExtractor;
+
+    private boolean closed = false;
+    private int batchCount = 0;
+    private volatile Exception flushException;

Review comment:
       Not needed when using TimerService (runs in main thread).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {
+
+    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final ScheduledExecutorService scheduler =
+            Executors.newScheduledThreadPool(
+                    1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));

Review comment:
       Remove user TimerService.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSink.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper 
{@link SinkWriter}
+ * and only emit it when the buffer is full or a timer is triggered or a 
checkpoint happens.
+ */
+class BufferedUpsertSink<WriterState> implements Sink<RowData, Void, 
WriterState, Void> {
+
+    private final Sink<RowData, ?, WriterState, ?> wrappedSink;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private final SinkBufferFlushMode bufferFlushMode;
+    @Nullable private final TypeSerializer<RowData> typeSerializer;
+
+    BufferedUpsertSink(
+            Sink<RowData, ?, WriterState, ?> wrappedSink,
+            DataType physicalDataType,
+            int[] keyProjection,
+            SinkBufferFlushMode bufferFlushMode,
+            @Nullable TypeSerializer<RowData> typeSerializer) {
+        this.wrappedSink = wrappedSink;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.bufferFlushMode = bufferFlushMode;
+        this.typeSerializer = typeSerializer;
+    }
+
+    @Override
+    public SinkWriter<RowData, Void, WriterState> createWriter(
+            InitContext context, List<WriterState> states) throws IOException {
+        final SinkWriter<RowData, ?, WriterState> wrapperWriter =
+                wrappedSink.createWriter(context, states);
+        return new BufferedUpsertWriter<>(
+                wrapperWriter, physicalDataType, keyProjection, 
bufferFlushMode, typeSerializer);
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<WriterState>> 
getWriterStateSerializer() {
+        return wrappedSink.getWriterStateSerializer();
+    }
+
+    @Override
+    public Optional<Committer<Void>> createCommitter() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<GlobalCommitter<Void, Void>> createGlobalCommitter() 
throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<Void>> 
getCommittableSerializer() {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<SimpleVersionedSerializer<Void>> 
getGlobalCommittableSerializer() {
+        return Optional.empty();

Review comment:
       Delegate to wrapped sink!

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {
+
+    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final ScheduledExecutorService scheduler =
+            Executors.newScheduledThreadPool(
+                    1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+    private final ScheduledFuture<?> scheduledFuture;
+    private final int batchMaxRowNums;
+    private final Function<RowData, RowData> valueCopier;
+    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new 
HashMap<>();
+    private final Function<RowData, RowData> keyExtractor;
+
+    private boolean closed = false;
+    private int batchCount = 0;
+    private volatile Exception flushException;
+
+    BufferedUpsertWriter(
+            SinkWriter<RowData, ?, WriterState> wrappedWriter,
+            DataType physicalDataType,
+            int[] keyProjection,
+            SinkBufferFlushMode bufferFlushMode,
+            @Nullable TypeSerializer<RowData> typeSerializer) {
+        checkArgument(bufferFlushMode != null && bufferFlushMode.isEnabled());
+        this.wrappedWriter = checkNotNull(wrappedWriter);
+        this.batchMaxRowNums = bufferFlushMode.getBatchSize();
+        long batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
+
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier = typeSerializer == null ? Function.identity() : 
typeSerializer::copy;
+
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized (BufferedUpsertWriter.this) {

Review comment:
       Remove sync when switched to TimerService.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {

Review comment:
       `ReducingUpsertWriter`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {
+
+    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final ScheduledExecutorService scheduler =
+            Executors.newScheduledThreadPool(
+                    1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+    private final ScheduledFuture<?> scheduledFuture;
+    private final int batchMaxRowNums;
+    private final Function<RowData, RowData> valueCopier;

Review comment:
       Is that an actual word? `valueCopyFunction`?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {
+
+    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final ScheduledExecutorService scheduler =
+            Executors.newScheduledThreadPool(
+                    1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+    private final ScheduledFuture<?> scheduledFuture;
+    private final int batchMaxRowNums;
+    private final Function<RowData, RowData> valueCopier;
+    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new 
HashMap<>();
+    private final Function<RowData, RowData> keyExtractor;
+
+    private boolean closed = false;
+    private int batchCount = 0;
+    private volatile Exception flushException;
+
+    BufferedUpsertWriter(
+            SinkWriter<RowData, ?, WriterState> wrappedWriter,
+            DataType physicalDataType,
+            int[] keyProjection,
+            SinkBufferFlushMode bufferFlushMode,
+            @Nullable TypeSerializer<RowData> typeSerializer) {
+        checkArgument(bufferFlushMode != null && bufferFlushMode.isEnabled());
+        this.wrappedWriter = checkNotNull(wrappedWriter);
+        this.batchMaxRowNums = bufferFlushMode.getBatchSize();
+        long batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
+
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier = typeSerializer == null ? Function.identity() : 
typeSerializer::copy;
+
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized (BufferedUpsertWriter.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs,
+                        batchIntervalMs,
+                        TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void write(RowData element, Context context) throws IOException, 
InterruptedException {
+        wrappedContext.setContext(context);
+        addToBuffer(element, context.timestamp());
+    }
+
+    @Override
+    public List<Void> prepareCommit(boolean flush) throws IOException, 
InterruptedException {
+        flush();
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<WriterState> snapshotState() throws IOException {
+        return wrappedWriter.snapshotState();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closed) {
+            closed = true;
+            if (scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                scheduler.shutdown();
+            }
+            wrappedWriter.close();
+        }
+        checkFlushException();
+    }
+
+    private void addToBuffer(RowData row, Long timestamp) throws IOException, 
InterruptedException {
+        checkFlushException();
+
+        RowData key = keyExtractor.apply(row);
+        RowData value = valueCopier.apply(row);
+        reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp));
+        batchCount++;
+
+        if (batchCount >= batchMaxRowNums) {
+            flush();
+        }
+    }
+
+    private RowData changeFlag(RowData value) {
+        switch (value.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                value.setRowKind(UPDATE_AFTER);

Review comment:
       Is this really correct? @twalthr I can see it giving the correct result 
for Kafka but this is supposed to be a general purpose abstraction...

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {
+
+    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final ScheduledExecutorService scheduler =
+            Executors.newScheduledThreadPool(
+                    1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+    private final ScheduledFuture<?> scheduledFuture;
+    private final int batchMaxRowNums;
+    private final Function<RowData, RowData> valueCopier;
+    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new 
HashMap<>();
+    private final Function<RowData, RowData> keyExtractor;
+
+    private boolean closed = false;
+    private int batchCount = 0;
+    private volatile Exception flushException;
+
+    BufferedUpsertWriter(
+            SinkWriter<RowData, ?, WriterState> wrappedWriter,
+            DataType physicalDataType,
+            int[] keyProjection,
+            SinkBufferFlushMode bufferFlushMode,
+            @Nullable TypeSerializer<RowData> typeSerializer) {
+        checkArgument(bufferFlushMode != null && bufferFlushMode.isEnabled());
+        this.wrappedWriter = checkNotNull(wrappedWriter);
+        this.batchMaxRowNums = bufferFlushMode.getBatchSize();
+        long batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
+
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier = typeSerializer == null ? Function.identity() : 
typeSerializer::copy;
+
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized (BufferedUpsertWriter.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {

Review comment:
       You probably need to add InterruptionException to 
ProcessingTimeCallback#onProcessingTime, which I forgot to implement ;)

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriterTest.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link BufferedUpsertWriter}. */
+@RunWith(Parameterized.class)
+public class BufferedUpsertWriterTest {

Review comment:
       I guess we need to replicate because the other thing cannot be removed 
and is deprecated?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -185,44 +189,55 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
         final SerializationSchema<RowData> valueSerialization =
                 createSerialization(context, valueEncodingFormat, 
valueProjection, null);
 
+        final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder();
+        final List<LogicalType> physicalChildren = 
physicalDataType.getLogicalType().getChildren();
+        if (transactionalIdPrefix != null) {
+            sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix);
+        }
+        final KafkaSink<RowData> kafkaSink =
+                sinkBuilder
+                        .setDeliverGuarantee(deliveryGuarantee)
+                        .setBootstrapServers(
+                                
properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
+                        .setKafkaProducerConfig(properties)
+                        .setRecordSerializer(
+                                new DynamicKafkaRecordSerializationSchema(
+                                        topic,
+                                        partitioner,
+                                        keySerialization,
+                                        valueSerialization,
+                                        getFieldGetters(physicalChildren, 
keyProjection),
+                                        getFieldGetters(physicalChildren, 
valueProjection),
+                                        hasMetadata(),
+                                        getMetadataPositions(physicalChildren),
+                                        upsertMode))
+                        .build();
         if (flushMode.isEnabled() && upsertMode) {
-            BufferedUpsertSinkFunction buffedSinkFunction =
-                    new BufferedUpsertSinkFunction(
-                            createKafkaProducer(keySerialization, 
valueSerialization),
-                            physicalDataType,
-                            keyProjection,
-                            context.createTypeInformation(consumedDataType),
-                            flushMode);
-            return SinkFunctionProvider.of(buffedSinkFunction, parallelism);
-        } else {
-            final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder();
-            final List<LogicalType> physicalChildren =
-                    physicalDataType.getLogicalType().getChildren();
-            if (transactionalIdPrefix != null) {
-                sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix);
-            }
-            return SinkProvider.of(
-                    sinkBuilder
-                            .setDeliverGuarantee(deliveryGuarantee)
-                            .setBootstrapServers(
-                                    properties
-                                            
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
-                                            .toString())
-                            .setKafkaProducerConfig(properties)
-                            .setRecordSerializer(
-                                    new DynamicKafkaRecordSerializationSchema(
-                                            topic,
-                                            partitioner,
-                                            keySerialization,
-                                            valueSerialization,
-                                            getFieldGetters(physicalChildren, 
keyProjection),
-                                            getFieldGetters(physicalChildren, 
valueProjection),
-                                            hasMetadata(),
-                                            
getMetadataPositions(physicalChildren),
-                                            upsertMode))
-                            .build(),
-                    parallelism);
+            return (DataStreamSinkProvider)
+                    dataStream -> {
+                        final boolean objectReuse =
+                                dataStream
+                                        .getExecutionEnvironment()
+                                        .getConfig()
+                                        .isObjectReuseEnabled();
+                        final BufferedUpsertSink<?> sink =
+                                new BufferedUpsertSink<>(
+                                        kafkaSink,
+                                        physicalDataType,
+                                        keyProjection,
+                                        flushMode,
+                                        objectReuse
+                                                ? createRowDataTypeSerializer(
+                                                        context, 
dataStream.getExecutionConfig())
+                                                : null);

Review comment:
       Maybe you should instead directly inject your `valueCopier`. Then it's 
easier to test.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertSink.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper 
{@link SinkWriter}
+ * and only emit it when the buffer is full or a timer is triggered or a 
checkpoint happens.
+ */
+class BufferedUpsertSink<WriterState> implements Sink<RowData, Void, 
WriterState, Void> {

Review comment:
       `ReducingUpsertSink`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {
+
+    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final ScheduledExecutorService scheduler =
+            Executors.newScheduledThreadPool(
+                    1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+    private final ScheduledFuture<?> scheduledFuture;
+    private final int batchMaxRowNums;
+    private final Function<RowData, RowData> valueCopier;
+    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new 
HashMap<>();
+    private final Function<RowData, RowData> keyExtractor;
+
+    private boolean closed = false;
+    private int batchCount = 0;
+    private volatile Exception flushException;
+
+    BufferedUpsertWriter(
+            SinkWriter<RowData, ?, WriterState> wrappedWriter,
+            DataType physicalDataType,
+            int[] keyProjection,
+            SinkBufferFlushMode bufferFlushMode,
+            @Nullable TypeSerializer<RowData> typeSerializer) {
+        checkArgument(bufferFlushMode != null && bufferFlushMode.isEnabled());
+        this.wrappedWriter = checkNotNull(wrappedWriter);
+        this.batchMaxRowNums = bufferFlushMode.getBatchSize();
+        long batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
+
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier = typeSerializer == null ? Function.identity() : 
typeSerializer::copy;
+
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized (BufferedUpsertWriter.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs,
+                        batchIntervalMs,
+                        TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void write(RowData element, Context context) throws IOException, 
InterruptedException {
+        wrappedContext.setContext(context);
+        addToBuffer(element, context.timestamp());
+    }
+
+    @Override
+    public List<Void> prepareCommit(boolean flush) throws IOException, 
InterruptedException {
+        flush();
+        return Collections.emptyList();

Review comment:
       okay so we are not supporting any kind of transaction? but is it that 
hard?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
##########
@@ -231,10 +238,27 @@ public void testBufferedTableSink() {
         // Test kafka producer.
         DynamicTableSink.SinkRuntimeProvider provider =
                 actualUpsertKafkaSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
-        assertThat(provider, instanceOf(SinkFunctionProvider.class));
-        final SinkFunctionProvider sinkFunctionProvider = 
(SinkFunctionProvider) provider;
-        final SinkFunction<RowData> sinkFunction = 
sinkFunctionProvider.createSinkFunction();
-        assertThat(sinkFunction, instanceOf(BufferedUpsertSinkFunction.class));
+        assertThat(provider, instanceOf(DataStreamSinkProvider.class));
+        final DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider) 
provider;
+        final DataStreamSink<?> sinkFunction =

Review comment:
       var name

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
##########
@@ -231,10 +238,27 @@ public void testBufferedTableSink() {
         // Test kafka producer.
         DynamicTableSink.SinkRuntimeProvider provider =
                 actualUpsertKafkaSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
-        assertThat(provider, instanceOf(SinkFunctionProvider.class));
-        final SinkFunctionProvider sinkFunctionProvider = 
(SinkFunctionProvider) provider;
-        final SinkFunction<RowData> sinkFunction = 
sinkFunctionProvider.createSinkFunction();
-        assertThat(sinkFunction, instanceOf(BufferedUpsertSinkFunction.class));
+        assertThat(provider, instanceOf(DataStreamSinkProvider.class));
+        final DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider) 
provider;
+        final DataStreamSink<?> sinkFunction =
+                sinkProvider.consumeDataStream(
+                        new DataStream<>(
+                                new LocalStreamEnvironment(),
+                                new Transformation<RowData>("test", null, 1) {
+                                    @Override
+                                    public List<Transformation<?>> 
getTransitivePredecessors() {
+                                        return Collections.emptyList();
+                                    }
+
+                                    @Override
+                                    public List<Transformation<?>> getInputs() 
{
+                                        return Collections.emptyList();
+                                    }
+                                }));
+        final Field field = 
sinkFunction.getClass().getDeclaredField("transformation");
+        field.setAccessible(true);
+        final SinkTransformation transformation = (SinkTransformation) 
field.get(sinkFunction);
+        assertThat(transformation.getSink(), 
instanceOf(BufferedUpsertSink.class));

Review comment:
       Noooo.
   
   ```
   
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
           sinkProvider.consumeDataStream(env.fromElements(1, 2));
           env.getStreamGraph().getStreamNodes() // <- look for sink and check 
getOperatorFactory()
     ```

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertWriter.java
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class BufferedUpsertWriter<WriterState> implements SinkWriter<RowData, Void, 
WriterState> {
+
+    private final SinkWriter<RowData, ?, WriterState> wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final ScheduledExecutorService scheduler =
+            Executors.newScheduledThreadPool(
+                    1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+    private final ScheduledFuture<?> scheduledFuture;
+    private final int batchMaxRowNums;
+    private final Function<RowData, RowData> valueCopier;
+    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new 
HashMap<>();
+    private final Function<RowData, RowData> keyExtractor;
+
+    private boolean closed = false;
+    private int batchCount = 0;
+    private volatile Exception flushException;
+
+    BufferedUpsertWriter(
+            SinkWriter<RowData, ?, WriterState> wrappedWriter,
+            DataType physicalDataType,
+            int[] keyProjection,
+            SinkBufferFlushMode bufferFlushMode,
+            @Nullable TypeSerializer<RowData> typeSerializer) {
+        checkArgument(bufferFlushMode != null && bufferFlushMode.isEnabled());
+        this.wrappedWriter = checkNotNull(wrappedWriter);
+        this.batchMaxRowNums = bufferFlushMode.getBatchSize();
+        long batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
+
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier = typeSerializer == null ? Function.identity() : 
typeSerializer::copy;
+
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized (BufferedUpsertWriter.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs,
+                        batchIntervalMs,
+                        TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void write(RowData element, Context context) throws IOException, 
InterruptedException {
+        wrappedContext.setContext(context);
+        addToBuffer(element, context.timestamp());
+    }
+
+    @Override
+    public List<Void> prepareCommit(boolean flush) throws IOException, 
InterruptedException {
+        flush();
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<WriterState> snapshotState() throws IOException {
+        return wrappedWriter.snapshotState();

Review comment:
       We need to snapshot the buffer as well. Or else we do not even get at 
least once.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to