wuchong commented on a change in pull request #15434:
URL: https://github.com/apache/flink/pull/15434#discussion_r604573177



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };
+
+        // register timer
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized 
(BufferedUpsertKafkaSinkFunction.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs.toMillis(),
+                        batchIntervalMs.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        producer.open(parameters);
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext t) {
+        producer.setRuntimeContext(t);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return producer.getRuntimeContext();
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) throws Exception {
+        wrappedContext.setContext(context);
+        addToBuffer(value, context.timestamp());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (producer instanceof CheckpointListener) {
+            ((CheckpointListener) 
producer).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        flush();
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).snapshotState(context);
+        }
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to kafka failed.", e);
+                    throw new RuntimeException("Writing records to kafka 
failed.", e);
+                }
+            }
+
+            producer.close();
+        }
+        super.close();
+        checkFlushException();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).initializeState(context);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private synchronized void addToBuffer(RowData row, Long timestamp) throws 
Exception {
+        if (batchCount >= batchMaxRowNums) {
+            flush();
+        }
+        writeRecord(row, timestamp);
+    }
+
+    /** Flush the data into the inner sink function and send the data into the 
sink. */
+    private synchronized void writeRecord(RowData row, Long timestamp) {
+        System.out.println(row);

Review comment:
       remove print.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>

Review comment:
       Can be more generic, e.g. `BufferedUpsertSinkFunction` and please add 
`serialVersionUID`. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;

Review comment:
       Add checks they must greater than zero. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };
+
+        // register timer
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized 
(BufferedUpsertKafkaSinkFunction.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs.toMillis(),
+                        batchIntervalMs.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        producer.open(parameters);
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext t) {
+        producer.setRuntimeContext(t);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return producer.getRuntimeContext();
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) throws Exception {
+        wrappedContext.setContext(context);
+        addToBuffer(value, context.timestamp());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (producer instanceof CheckpointListener) {
+            ((CheckpointListener) 
producer).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        flush();
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).snapshotState(context);
+        }
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to kafka failed.", e);
+                    throw new RuntimeException("Writing records to kafka 
failed.", e);
+                }
+            }
+
+            producer.close();
+        }
+        super.close();
+        checkFlushException();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).initializeState(context);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private synchronized void addToBuffer(RowData row, Long timestamp) throws 
Exception {
+        if (batchCount >= batchMaxRowNums) {
+            flush();
+        }
+        writeRecord(row, timestamp);
+    }
+
+    /** Flush the data into the inner sink function and send the data into the 
sink. */
+    private synchronized void writeRecord(RowData row, Long timestamp) {

Review comment:
       call `checkFlushException` at the beginning of this method. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };
+
+        // register timer
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized 
(BufferedUpsertKafkaSinkFunction.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs.toMillis(),
+                        batchIntervalMs.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        producer.open(parameters);
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext t) {
+        producer.setRuntimeContext(t);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return producer.getRuntimeContext();
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) throws Exception {
+        wrappedContext.setContext(context);
+        addToBuffer(value, context.timestamp());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (producer instanceof CheckpointListener) {
+            ((CheckpointListener) 
producer).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        flush();
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).snapshotState(context);
+        }
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to kafka failed.", e);
+                    throw new RuntimeException("Writing records to kafka 
failed.", e);
+                }
+            }
+
+            producer.close();
+        }
+        super.close();
+        checkFlushException();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).initializeState(context);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private synchronized void addToBuffer(RowData row, Long timestamp) throws 
Exception {

Review comment:
       don't need `synchronized` for this method.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };

Review comment:
       This is not safe, the BinaryStringData may also be reused. Should use 
`TypeSerializer` to copy records. And we don't need to copy records if object 
reuse not enabled. 
   
   You can use 
`getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()` to know 
whether object reuse is enabled in `open()` method. Then use 
`Function.identity` as the `valueCopier`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;

Review comment:
       use primitive int to make sure it's not null.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;

Review comment:
       `batchIntervalMs` -> `batchInterval`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };
+
+        // register timer
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized 
(BufferedUpsertKafkaSinkFunction.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs.toMillis(),
+                        batchIntervalMs.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        producer.open(parameters);
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext t) {
+        producer.setRuntimeContext(t);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return producer.getRuntimeContext();
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) throws Exception {
+        wrappedContext.setContext(context);
+        addToBuffer(value, context.timestamp());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (producer instanceof CheckpointListener) {
+            ((CheckpointListener) 
producer).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        flush();
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).snapshotState(context);
+        }
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to kafka failed.", e);
+                    throw new RuntimeException("Writing records to kafka 
failed.", e);
+                }
+            }
+
+            producer.close();
+        }
+        super.close();
+        checkFlushException();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).initializeState(context);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private synchronized void addToBuffer(RowData row, Long timestamp) throws 
Exception {
+        if (batchCount >= batchMaxRowNums) {
+            flush();
+        }
+        writeRecord(row, timestamp);

Review comment:
       We can combine `addToBuffer` and `writeRecord` into one method, and 
please call flulsh check after `writeRecord` to make sure buffers are flushed 
when reaching batch max num. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };
+
+        // register timer
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized 
(BufferedUpsertKafkaSinkFunction.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs.toMillis(),
+                        batchIntervalMs.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        producer.open(parameters);
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext t) {
+        producer.setRuntimeContext(t);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return producer.getRuntimeContext();
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) throws Exception {
+        wrappedContext.setContext(context);
+        addToBuffer(value, context.timestamp());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {

Review comment:
       Should also override `notifyCheckpointAborted`. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };
+
+        // register timer
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized 
(BufferedUpsertKafkaSinkFunction.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs.toMillis(),
+                        batchIntervalMs.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        producer.open(parameters);
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext t) {
+        producer.setRuntimeContext(t);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return producer.getRuntimeContext();
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) throws Exception {
+        wrappedContext.setContext(context);
+        addToBuffer(value, context.timestamp());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (producer instanceof CheckpointListener) {
+            ((CheckpointListener) 
producer).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        flush();
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).snapshotState(context);
+        }
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to kafka failed.", e);
+                    throw new RuntimeException("Writing records to kafka 
failed.", e);
+                }
+            }
+
+            producer.close();
+        }
+        super.close();
+        checkFlushException();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {

Review comment:
       Move this method besides `snapshotState`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -242,6 +248,7 @@ public int hashCode() {
                 partitioner,
                 semantic,
                 upsertMode,
+                sinkFunctionCreator,

Review comment:
       This breaks the semantic of `hashCode` and `equals`, i.e. equal objects 
should have same hash code. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -155,6 +161,20 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
 
         Integer parallelism = tableOptions.get(FactoryUtil.SINK_PARALLELISM);
 
+        KafkaDynamicSink.SinkFunctionProviderCreator creator;
+        Duration interval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL);
+        Integer batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
+        if (batchSize > 0 && interval.toMillis() > 0) {

Review comment:
       We should enable buffering when one of them is larger than zero. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
##########
@@ -155,6 +161,20 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
 
         Integer parallelism = tableOptions.get(FactoryUtil.SINK_PARALLELISM);
 
+        KafkaDynamicSink.SinkFunctionProviderCreator creator;
+        Duration interval = tableOptions.get(SINK_BUFFER_FLUSH_INTERVAL);
+        Integer batchSize = tableOptions.get(SINK_BUFFER_FLUSH_MAX_ROWS);
+        if (batchSize > 0 && interval.toMillis() > 0) {
+            creator =
+                    BufferedUpsertKafkaSinkFunction.createBufferedSinkFunction(
+                            schema.toPhysicalRowDataType(),
+                            keyValueProjections.f0,
+                            batchSize,
+                            interval);

Review comment:
       Personally, I think all the runtime function initialization should 
happen in `KafkaDynamicSink#getSinkRuntimeProvider`. A DynamicTableSink should 
only hold logic information, e.g. `batchSize` and `flushInterval`. Holding the 
`SinkFunctionProviderCreator` is hard to do `equals` and `hashcode`. 
   
   This can avoid us introducing `SinkFunctionProviderCreator` and make sure we 
use the correct consistent physical DataType. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };
+
+        // register timer
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized 
(BufferedUpsertKafkaSinkFunction.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs.toMillis(),
+                        batchIntervalMs.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        producer.open(parameters);
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext t) {
+        producer.setRuntimeContext(t);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return producer.getRuntimeContext();
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) throws Exception {
+        wrappedContext.setContext(context);
+        addToBuffer(value, context.timestamp());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (producer instanceof CheckpointListener) {
+            ((CheckpointListener) 
producer).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        flush();
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).snapshotState(context);
+        }
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to kafka failed.", e);
+                    throw new RuntimeException("Writing records to kafka 
failed.", e);
+                }
+            }
+
+            producer.close();
+        }
+        super.close();
+        checkFlushException();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).initializeState(context);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private synchronized void addToBuffer(RowData row, Long timestamp) throws 
Exception {
+        if (batchCount >= batchMaxRowNums) {
+            flush();
+        }
+        writeRecord(row, timestamp);
+    }
+
+    /** Flush the data into the inner sink function and send the data into the 
sink. */
+    private synchronized void writeRecord(RowData row, Long timestamp) {
+        System.out.println(row);
+        RowData key = keyExtractor.apply(row);
+        RowData value = valueCopier.apply(row);
+        reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp));
+        batchCount++;
+    }
+
+    private synchronized void flush() throws Exception {
+        for (Tuple2<RowData, Long> value : reduceBuffer.values()) {
+            wrappedContext.setTimestamp(value.f1);
+            System.out.println(value.f0);

Review comment:
       remove.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunctionTest.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link BufferedUpsertKafkaSinkFunction}. */
+public class BufferedUpsertKafkaSinkFunctionTest {
+
+    private static final ResolvedSchema SCHEMA =
+            ResolvedSchema.of(
+                    Column.physical("id", DataTypes.INT().notNull()),
+                    Column.physical("title", DataTypes.STRING().notNull()),
+                    Column.physical("author", DataTypes.STRING()),
+                    Column.physical("price", DataTypes.DOUBLE()),
+                    Column.physical("qty", DataTypes.INT()),
+                    Column.physical("ts", DataTypes.TIMESTAMP(3)));
+
+    private static final int keyIndices = 0;
+    private static final int TIMESTAMP_INDICES = 5;
+    private static final int BATCH_SIZE = 4;
+    private static final Duration FLUSH_INTERVAL = Duration.ofMillis(60_000);
+
+    public static final RowData[] TEST_DATA = {
+        GenericRowData.ofKind(
+                INSERT,
+                1001,
+                StringData.fromString("Java public for dummies"),
+                StringData.fromString("Tan Ah Teck"),
+                11.11,
+                11,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T15:00:00Z"))),
+        GenericRowData.ofKind(
+                INSERT,
+                1002,
+                StringData.fromString("More Java for dummies"),
+                StringData.fromString("Tan Ah Teck"),
+                22.22,
+                22,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T16:00:00Z"))),
+        GenericRowData.ofKind(
+                INSERT,
+                1004,
+                StringData.fromString("A Cup of Java"),
+                StringData.fromString("Kumar"),
+                44.44,
+                44,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T17:00:00Z"))),
+        GenericRowData.ofKind(
+                UPDATE_AFTER,
+                1004,
+                StringData.fromString("A Teaspoon of Java"),
+                StringData.fromString("Kevin Jones"),
+                55.55,
+                55,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T18:00:00Z"))),
+        GenericRowData.ofKind(
+                UPDATE_AFTER,
+                1004,
+                StringData.fromString("A Teaspoon of Java 1.4"),
+                StringData.fromString("Kevin Jones"),
+                66.66,
+                66,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T19:00:00Z"))),
+        GenericRowData.ofKind(
+                UPDATE_AFTER,
+                1004,
+                StringData.fromString("A Teaspoon of Java 1.5"),
+                StringData.fromString("Kevin Jones"),
+                77.77,
+                77,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T20:00:00Z"))),
+        GenericRowData.ofKind(
+                DELETE,
+                1004,
+                StringData.fromString("A Teaspoon of Java 1.8"),
+                StringData.fromString("Kevin Jones"),
+                null,
+                1010,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
+    };
+
+    private static KafkaDynamicSink.SinkFunctionProviderCreator creator =
+            BufferedUpsertKafkaSinkFunction.createBufferedSinkFunction(
+                    SCHEMA.toSinkRowDataType(), new int[] {keyIndices}, 
BATCH_SIZE, FLUSH_INTERVAL);

Review comment:
       Please disable the FLUSH_INTERVAL, otherwise test is not stable. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunction.java
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaSerializationSchema.createProjectedRow;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+
+/**
+ * The wrapper of the {@link RichSinkFunction}. It will buffer the data and 
emit when the buffer is
+ * full or timer is triggered or checkpointing.
+ */
+public class BufferedUpsertKafkaSinkFunction extends RichSinkFunction<RowData>
+        implements CheckpointedFunction, CheckpointListener {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(BufferedUpsertKafkaSinkFunction.class);
+
+    // 
--------------------------------------------------------------------------------------------
+    // Config
+    // 
--------------------------------------------------------------------------------------------
+
+    private final RichSinkFunction<RowData> producer;
+    private final Integer batchMaxRowNums;
+    private final Duration batchIntervalMs;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private boolean closed;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Writer and buffer
+    // 
--------------------------------------------------------------------------------------------
+
+    private int batchCount = 0;
+    private transient Map<RowData, Tuple2<RowData, Long>> reduceBuffer;
+    private transient WrappedContext wrappedContext;
+    private transient Function<RowData, RowData> keyExtractor;
+    private transient Function<RowData, RowData> valueCopier;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Timer attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
+    public static KafkaDynamicSink.SinkFunctionProviderCreator 
createBufferedSinkFunction(
+            DataType dataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        return (producer, parallelism) ->
+                SinkFunctionProvider.of(
+                        new BufferedUpsertKafkaSinkFunction(
+                                producer,
+                                dataType,
+                                keyProjection,
+                                batchMaxRowNums,
+                                batchIntervalMs),
+                        parallelism);
+    }
+
+    private BufferedUpsertKafkaSinkFunction(
+            RichSinkFunction<RowData> producer,
+            DataType physicalDataType,
+            int[] keyProjection,
+            Integer batchMaxRowNums,
+            Duration batchIntervalMs) {
+        this.producer = producer;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.batchMaxRowNums = batchMaxRowNums;
+        this.batchIntervalMs = batchIntervalMs;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        // init variable
+        reduceBuffer = new HashMap<>();
+        wrappedContext = new WrappedContext();
+        closed = false;
+
+        // create keyExtractor and value copier
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField ->
+                                        RowData.createFieldGetter(
+                                                fields.get(targetField), 
targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopier =
+                rowData -> {
+                    GenericRowData copiedRowData =
+                            new GenericRowData(rowData.getRowKind(), 
rowData.getArity());
+                    for (int i = 0; i < rowData.getArity(); i++) {
+                        copiedRowData.setField(i, ((GenericRowData) 
rowData).getField(i));
+                    }
+                    return copiedRowData;
+                };
+
+        // register timer
+        this.scheduler =
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("upsert-kafka-sink-function"));
+        this.scheduledFuture =
+                this.scheduler.scheduleWithFixedDelay(
+                        () -> {
+                            synchronized 
(BufferedUpsertKafkaSinkFunction.this) {
+                                if (!closed) {
+                                    try {
+                                        flush();
+                                    } catch (Exception e) {
+                                        flushException = e;
+                                    }
+                                }
+                            }
+                        },
+                        batchIntervalMs.toMillis(),
+                        batchIntervalMs.toMillis(),
+                        TimeUnit.MILLISECONDS);
+
+        producer.open(parameters);
+    }
+
+    @Override
+    public void setRuntimeContext(RuntimeContext t) {
+        producer.setRuntimeContext(t);
+    }
+
+    @Override
+    public RuntimeContext getRuntimeContext() {
+        return producer.getRuntimeContext();
+    }
+
+    @Override
+    public void invoke(RowData value, Context context) throws Exception {
+        wrappedContext.setContext(context);
+        addToBuffer(value, context.timestamp());
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        if (producer instanceof CheckpointListener) {
+            ((CheckpointListener) 
producer).notifyCheckpointComplete(checkpointId);
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        flush();
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).snapshotState(context);
+        }
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (!closed) {
+            closed = true;
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    LOG.warn("Writing records to kafka failed.", e);
+                    throw new RuntimeException("Writing records to kafka 
failed.", e);
+                }
+            }
+
+            producer.close();
+        }
+        super.close();
+        checkFlushException();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        if (producer instanceof CheckpointedFunction) {
+            ((CheckpointedFunction) producer).initializeState(context);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private synchronized void addToBuffer(RowData row, Long timestamp) throws 
Exception {
+        if (batchCount >= batchMaxRowNums) {
+            flush();
+        }
+        writeRecord(row, timestamp);
+    }
+
+    /** Flush the data into the inner sink function and send the data into the 
sink. */
+    private synchronized void writeRecord(RowData row, Long timestamp) {
+        System.out.println(row);
+        RowData key = keyExtractor.apply(row);
+        RowData value = valueCopier.apply(row);
+        reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp));
+        batchCount++;
+    }
+
+    private synchronized void flush() throws Exception {

Review comment:
       call `checkFlushException` at the beginning of this method. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/BufferedUpsertKafkaSinkFunctionTest.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link BufferedUpsertKafkaSinkFunction}. */
+public class BufferedUpsertKafkaSinkFunctionTest {
+
+    private static final ResolvedSchema SCHEMA =
+            ResolvedSchema.of(
+                    Column.physical("id", DataTypes.INT().notNull()),
+                    Column.physical("title", DataTypes.STRING().notNull()),
+                    Column.physical("author", DataTypes.STRING()),
+                    Column.physical("price", DataTypes.DOUBLE()),
+                    Column.physical("qty", DataTypes.INT()),
+                    Column.physical("ts", DataTypes.TIMESTAMP(3)));
+
+    private static final int keyIndices = 0;
+    private static final int TIMESTAMP_INDICES = 5;
+    private static final int BATCH_SIZE = 4;
+    private static final Duration FLUSH_INTERVAL = Duration.ofMillis(60_000);
+
+    public static final RowData[] TEST_DATA = {
+        GenericRowData.ofKind(
+                INSERT,
+                1001,
+                StringData.fromString("Java public for dummies"),
+                StringData.fromString("Tan Ah Teck"),
+                11.11,
+                11,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T15:00:00Z"))),
+        GenericRowData.ofKind(
+                INSERT,
+                1002,
+                StringData.fromString("More Java for dummies"),
+                StringData.fromString("Tan Ah Teck"),
+                22.22,
+                22,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T16:00:00Z"))),
+        GenericRowData.ofKind(
+                INSERT,
+                1004,
+                StringData.fromString("A Cup of Java"),
+                StringData.fromString("Kumar"),
+                44.44,
+                44,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T17:00:00Z"))),
+        GenericRowData.ofKind(
+                UPDATE_AFTER,
+                1004,
+                StringData.fromString("A Teaspoon of Java"),
+                StringData.fromString("Kevin Jones"),
+                55.55,
+                55,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T18:00:00Z"))),
+        GenericRowData.ofKind(
+                UPDATE_AFTER,
+                1004,
+                StringData.fromString("A Teaspoon of Java 1.4"),
+                StringData.fromString("Kevin Jones"),
+                66.66,
+                66,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T19:00:00Z"))),
+        GenericRowData.ofKind(
+                UPDATE_AFTER,
+                1004,
+                StringData.fromString("A Teaspoon of Java 1.5"),
+                StringData.fromString("Kevin Jones"),
+                77.77,
+                77,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T20:00:00Z"))),
+        GenericRowData.ofKind(
+                DELETE,
+                1004,
+                StringData.fromString("A Teaspoon of Java 1.8"),
+                StringData.fromString("Kevin Jones"),
+                null,
+                1010,
+                
TimestampData.fromInstant(Instant.parse("2021-03-30T21:00:00Z")))
+    };
+
+    private static KafkaDynamicSink.SinkFunctionProviderCreator creator =
+            BufferedUpsertKafkaSinkFunction.createBufferedSinkFunction(
+                    SCHEMA.toSinkRowDataType(), new int[] {keyIndices}, 
BATCH_SIZE, FLUSH_INTERVAL);
+
+    @Test
+    public void testWirteData() throws Exception {
+        MockedSinkFunction sinkFunction = new MockedSinkFunction();
+        createBufferedSinkAndWriteData(sinkFunction);
+
+        HashMap<Integer, List<RowData>> expected = new HashMap<>();
+        expected.put(
+                1001,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1001,
+                                StringData.fromString("Java public for 
dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                11.11,
+                                11,
+                                
TimestampData.fromInstant(Instant.parse("2021-03-30T15:00:00Z")))));
+        expected.put(
+                1002,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1002,
+                                StringData.fromString("More Java for dummies"),
+                                StringData.fromString("Tan Ah Teck"),
+                                22.22,
+                                22,
+                                
TimestampData.fromInstant(Instant.parse("2021-03-30T16:00:00Z")))));
+        expected.put(
+                1004,
+                Collections.singletonList(
+                        GenericRowData.ofKind(
+                                UPDATE_AFTER,
+                                1004,
+                                StringData.fromString("A Teaspoon of Java"),
+                                StringData.fromString("Kevin Jones"),
+                                55.55,
+                                55,
+                                
TimestampData.fromInstant(Instant.parse("2021-03-30T18:00:00Z")))));
+
+        compareCompactedResult(expected, sinkFunction.rowDataCollectors);

Review comment:
       Why not compare row list received by sinkFunction, but compare the map?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to