This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b9c9688fc5 [INLONG-10159][Sort] Kafka connector support audit ID 
(#10180)
b9c9688fc5 is described below

commit b9c9688fc5bba2a4d94baa33eefc1f3c99889a4b
Author: XiaoYou201 <[email protected]>
AuthorDate: Thu May 16 13:58:27 2024 +0800

    [INLONG-10159][Sort] Kafka connector support audit ID (#10180)
---
 .../table/DynamicKafkaDeserializationSchema.java   | 289 ++++++++++
 .../DynamicKafkaRecordSerializationSchema.java     | 200 +++++++
 .../kafka/table/KafkaConnectorOptionsUtil.java     | 298 +++++-----
 .../inlong/sort/kafka/table/KafkaDynamicSink.java  | 496 ++++++++++++++++
 .../sort/kafka/table/KafkaDynamicSource.java       | 637 +++++++++++++++++++++
 .../sort/kafka/table/KafkaDynamicTableFactory.java |  80 +--
 .../sort/kafka/{ => table}/KafkaOptions.java       |   7 +-
 .../sort/kafka/table/ReducingUpsertSink.java       | 101 ++++
 .../sort/kafka/table/ReducingUpsertWriter.java     | 200 +++++++
 .../table/UpsertKafkaDynamicTableFactory.java      |  47 +-
 licenses/inlong-sort-connectors/LICENSE            |  11 +-
 11 files changed, 2151 insertions(+), 215 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
new file mode 100644
index 0000000000..7863321c9e
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricsCollector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A specific {KafkaSerializationSchema} for {KafkaDynamicSource}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
+class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSchema<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final @Nullable DeserializationSchema<RowData> keyDeserialization;
+
+    private final DeserializationSchema<RowData> valueDeserialization;
+
+    private final boolean hasMetadata;
+
+    private final BufferingCollector keyCollector;
+
+    private final OutputProjectionCollector outputCollector;
+
+    private final TypeInformation<RowData> producedTypeInfo;
+
+    private final boolean upsertMode;
+
+    private final MetricOption metricOption;
+
+    private SourceMetricData sourceMetricData;
+    DynamicKafkaDeserializationSchema(
+            int physicalArity,
+            @Nullable DeserializationSchema<RowData> keyDeserialization,
+            int[] keyProjection,
+            DeserializationSchema<RowData> valueDeserialization,
+            int[] valueProjection,
+            boolean hasMetadata,
+            MetadataConverter[] metadataConverters,
+            TypeInformation<RowData> producedTypeInfo,
+            boolean upsertMode,
+            MetricOption metricOption) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keyDeserialization != null && keyProjection.length > 0,
+                    "Key must be set in upsert mode for deserialization 
schema.");
+        }
+        this.keyDeserialization = keyDeserialization;
+        this.valueDeserialization = valueDeserialization;
+        this.hasMetadata = hasMetadata;
+        this.keyCollector = new BufferingCollector();
+        this.outputCollector =
+                new OutputProjectionCollector(
+                        physicalArity,
+                        keyProjection,
+                        valueProjection,
+                        metadataConverters,
+                        upsertMode);
+        this.producedTypeInfo = producedTypeInfo;
+        this.upsertMode = upsertMode;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open(DeserializationSchema.InitializationContext context) 
throws Exception {
+        if (keyDeserialization != null) {
+            keyDeserialization.open(context);
+        }
+        valueDeserialization.open(context);
+        if (metricOption != null) {
+            sourceMetricData = new SourceMetricData(metricOption);
+        }
+    }
+
+    @Override
+    public boolean isEndOfStream(RowData nextElement) {
+        return false;
+    }
+
+    @Override
+    public RowData deserialize(ConsumerRecord<byte[], byte[]> record) throws 
Exception {
+        throw new IllegalStateException("A collector is required for 
deserializing.");
+    }
+
+    @Override
+    public void deserialize(ConsumerRecord<byte[], byte[]> record, 
Collector<RowData> collector)
+            throws Exception {
+        // shortcut in case no output projection is required,
+        // also not for a cartesian product with the keys
+        if (keyDeserialization == null && !hasMetadata) {
+            valueDeserialization.deserialize(record.value(),
+                    sourceMetricData == null ? collector : new 
MetricsCollector<>(collector, sourceMetricData));
+            return;
+        }
+
+        // buffer key(s)
+        if (keyDeserialization != null) {
+            keyDeserialization.deserialize(record.key(), keyCollector);
+        }
+
+        // project output while emitting values
+        outputCollector.inputRecord = record;
+        outputCollector.physicalKeyRows = keyCollector.buffer;
+        outputCollector.outputCollector =
+                sourceMetricData == null ? collector : new 
MetricsCollector<>(collector, sourceMetricData);
+        if (record.value() == null && upsertMode) {
+            // collect tombstone messages in upsert mode by hand
+            outputCollector.collect(null);
+        } else {
+            valueDeserialization.deserialize(record.value(), outputCollector);
+        }
+        keyCollector.buffer.clear();
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return producedTypeInfo;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    interface MetadataConverter extends Serializable {
+
+        Object read(ConsumerRecord<?, ?> record);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final class BufferingCollector implements 
Collector<RowData>, Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final List<RowData> buffer = new ArrayList<>();
+
+        @Override
+        public void collect(RowData record) {
+            buffer.add(record);
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /**
+     * Emits a row with key, value, and metadata fields.
+     *
+     * <p>The collector is able to handle the following kinds of keys:
+     *
+     * <ul>
+     *   <li>No key is used.
+     *   <li>A key is used.
+     *   <li>The deserialization schema emits multiple keys.
+     *   <li>Keys and values have overlapping fields.
+     *   <li>Keys are used and value is null.
+     * </ul>
+     */
+    private static final class OutputProjectionCollector
+            implements
+                Collector<RowData>,
+                Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        private final int physicalArity;
+
+        private final int[] keyProjection;
+
+        private final int[] valueProjection;
+
+        private final MetadataConverter[] metadataConverters;
+
+        private final boolean upsertMode;
+
+        private transient ConsumerRecord<?, ?> inputRecord;
+
+        private transient List<RowData> physicalKeyRows;
+
+        private transient Collector<RowData> outputCollector;
+
+        OutputProjectionCollector(
+                int physicalArity,
+                int[] keyProjection,
+                int[] valueProjection,
+                MetadataConverter[] metadataConverters,
+                boolean upsertMode) {
+            this.physicalArity = physicalArity;
+            this.keyProjection = keyProjection;
+            this.valueProjection = valueProjection;
+            this.metadataConverters = metadataConverters;
+            this.upsertMode = upsertMode;
+        }
+
+        @Override
+        public void collect(RowData physicalValueRow) {
+            // no key defined
+            if (keyProjection.length == 0) {
+                emitRow(null, (GenericRowData) physicalValueRow);
+                return;
+            }
+
+            // otherwise emit a value for each key
+            for (RowData physicalKeyRow : physicalKeyRows) {
+                emitRow((GenericRowData) physicalKeyRow, (GenericRowData) 
physicalValueRow);
+            }
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+
+        private void emitRow(
+                @Nullable GenericRowData physicalKeyRow,
+                @Nullable GenericRowData physicalValueRow) {
+            final RowKind rowKind;
+            if (physicalValueRow == null) {
+                if (upsertMode) {
+                    rowKind = RowKind.DELETE;
+                } else {
+                    throw new DeserializationException(
+                            "Invalid null value received in non-upsert mode. 
Could not to set row kind for output record.");
+                }
+            } else {
+                rowKind = physicalValueRow.getRowKind();
+            }
+
+            final int metadataArity = metadataConverters.length;
+            final GenericRowData producedRow =
+                    new GenericRowData(rowKind, physicalArity + metadataArity);
+
+            for (int keyPos = 0; keyPos < keyProjection.length; keyPos++) {
+                assert physicalKeyRow != null;
+                producedRow.setField(keyProjection[keyPos], 
physicalKeyRow.getField(keyPos));
+            }
+
+            if (physicalValueRow != null) {
+                for (int valuePos = 0; valuePos < valueProjection.length; 
valuePos++) {
+                    producedRow.setField(
+                            valueProjection[valuePos], 
physicalValueRow.getField(valuePos));
+                }
+            }
+
+            for (int metadataPos = 0; metadataPos < metadataArity; 
metadataPos++) {
+                producedRow.setField(
+                        physicalArity + metadataPos,
+                        metadataConverters[metadataPos].read(inputRecord));
+            }
+            outputCollector.collect(producedRow);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
new file mode 100644
index 0000000000..f1966dd33d
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
+
+/** SerializationSchema used by {@link KafkaDynamicSink} to configure a 
{KafkaSink}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
+class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
+
+    private final String topic;
+    private final FlinkKafkaPartitioner<RowData> partitioner;
+    @Nullable
+    private final SerializationSchema<RowData> keySerialization;
+    private final SerializationSchema<RowData> valueSerialization;
+    private final RowData.FieldGetter[] keyFieldGetters;
+    private final RowData.FieldGetter[] valueFieldGetters;
+    private final boolean hasMetadata;
+    private final int[] metadataPositions;
+    private final boolean upsertMode;
+    private final MetricOption metricOption;
+    private SinkMetricData sinkMetricData;
+
+    DynamicKafkaRecordSerializationSchema(
+            String topic,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization,
+            RowData.FieldGetter[] keyFieldGetters,
+            RowData.FieldGetter[] valueFieldGetters,
+            boolean hasMetadata,
+            int[] metadataPositions,
+            boolean upsertMode,
+            MetricOption metricOption) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keySerialization != null && keyFieldGetters.length > 0,
+                    "Key must be set in upsert mode for serialization 
schema.");
+        }
+        this.topic = checkNotNull(topic);
+        this.partitioner = partitioner;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = checkNotNull(valueSerialization);
+        this.keyFieldGetters = keyFieldGetters;
+        this.valueFieldGetters = valueFieldGetters;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+        this.upsertMode = upsertMode;
+        this.metricOption = metricOption;
+
+    }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(
+            RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+        // shortcut in case no input projection is required
+        if (keySerialization == null && !hasMetadata) {
+            final byte[] valueSerialized = 
valueSerialization.serialize(consumedRow);
+            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                    topic,
+                    extractPartition(
+                            consumedRow,
+                            null,
+                            valueSerialized,
+                            context.getPartitionsForTopic(topic)),
+                    null,
+                    valueSerialized);
+            if (sinkMetricData != null) {
+                sinkMetricData.invoke(1, getDataSize(record));
+            }
+            return record;
+        }
+        final byte[] keySerialized;
+        if (keySerialization == null) {
+            keySerialized = null;
+        } else {
+            final RowData keyRow = createProjectedRow(consumedRow, 
RowKind.INSERT, keyFieldGetters);
+            keySerialized = keySerialization.serialize(keyRow);
+        }
+
+        final byte[] valueSerialized;
+        final RowKind kind = consumedRow.getRowKind();
+        if (upsertMode) {
+            if (kind == RowKind.DELETE || kind == RowKind.UPDATE_BEFORE) {
+                // transform the message as the tombstone message
+                valueSerialized = null;
+            } else {
+                // make the message to be INSERT to be compliant with the 
INSERT-ONLY format
+                final RowData valueRow =
+                        
DynamicKafkaRecordSerializationSchema.createProjectedRow(
+                                consumedRow, kind, valueFieldGetters);
+                valueRow.setRowKind(RowKind.INSERT);
+                valueSerialized = valueSerialization.serialize(valueRow);
+            }
+        } else {
+            final RowData valueRow =
+                    DynamicKafkaRecordSerializationSchema.createProjectedRow(
+                            consumedRow, kind, valueFieldGetters);
+            valueSerialized = valueSerialization.serialize(valueRow);
+        }
+
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                topic,
+                extractPartition(
+                        consumedRow,
+                        keySerialized,
+                        valueSerialized,
+                        context.getPartitionsForTopic(topic)),
+                readMetadata(consumedRow, 
KafkaDynamicSink.WritableMetadata.TIMESTAMP),
+                keySerialized,
+                valueSerialized,
+                readMetadata(consumedRow, 
KafkaDynamicSink.WritableMetadata.HEADERS));
+        if (sinkMetricData != null) {
+            sinkMetricData.invoke(1, getDataSize(record));
+        }
+        return record;
+    }
+
+    @Override
+    public void open(
+            SerializationSchema.InitializationContext context, 
KafkaSinkContext sinkContext)
+            throws Exception {
+        if (keySerialization != null) {
+            keySerialization.open(context);
+        }
+        if (partitioner != null) {
+            partitioner.open(
+                    sinkContext.getParallelInstanceId(),
+                    sinkContext.getNumberOfParallelInstances());
+        }
+        valueSerialization.open(context);
+
+        if (metricOption != null) {
+            this.sinkMetricData = new SinkMetricData(metricOption, 
context.getMetricGroup());
+        }
+    }
+
+    private Integer extractPartition(
+            RowData consumedRow,
+            @Nullable byte[] keySerialized,
+            byte[] valueSerialized,
+            int[] partitions) {
+        if (partitioner != null) {
+            return partitioner.partition(
+                    consumedRow, keySerialized, valueSerialized, topic, 
partitions);
+        }
+        return null;
+    }
+
+    static RowData createProjectedRow(
+            RowData consumedRow, RowKind kind, RowData.FieldGetter[] 
fieldGetters) {
+        final int arity = fieldGetters.length;
+        final GenericRowData genericRowData = new GenericRowData(kind, arity);
+        for (int fieldPos = 0; fieldPos < arity; fieldPos++) {
+            genericRowData.setField(fieldPos, 
fieldGetters[fieldPos].getFieldOrNull(consumedRow));
+        }
+        return genericRowData;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> T readMetadata(RowData consumedRow, 
KafkaDynamicSink.WritableMetadata metadata) {
+        final int pos = metadataPositions[metadata.ordinal()];
+        if (pos < 0) {
+            return null;
+        }
+        return (T) metadata.converter.read(consumedRow, pos);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
index 06cf40ea49..27e9cdb062 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.data.RowData;
@@ -50,22 +49,13 @@ import java.util.Properties;
 import java.util.regex.Pattern;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.*;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 
-/** Utilities for {@link KafkaConnectorOptions}. */
+/** Utilities for { KafkaConnectorOptions}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
 @Internal
 class KafkaConnectorOptionsUtil {
 
@@ -76,14 +66,14 @@ class KafkaConnectorOptionsUtil {
     // Option enumerations
     // 
--------------------------------------------------------------------------------------------
 
-    // Prefix for Kafka specific properties.
-    public static final String PROPERTIES_PREFIX = "properties.";
-
     // Sink partitioner.
     public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
     public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
     public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = 
"round-robin";
 
+    // Prefix for Kafka specific properties.
+    public static final String PROPERTIES_PREFIX = "properties.";
+
     // Other keywords.
     private static final String PARTITION = "partition";
     private static final String OFFSET = "offset";
@@ -152,7 +142,7 @@ class KafkaConnectorOptionsUtil {
                                                         "'%s' is required in 
'%s' startup mode"
                                                                 + " but 
missing.",
                                                         
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
-                                                        
KafkaConnectorOptions.ScanStartupMode.TIMESTAMP));
+                                                        
ScanStartupMode.TIMESTAMP));
                                     }
 
                                     break;
@@ -165,7 +155,7 @@ class KafkaConnectorOptionsUtil {
                                                         "'%s' is required in 
'%s' startup mode"
                                                                 + " but 
missing.",
                                                         
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
-                                                        
KafkaConnectorOptions.ScanStartupMode.SPECIFIC_OFFSETS));
+                                                        
ScanStartupMode.SPECIFIC_OFFSETS));
                                     }
                                     if (!isSingleTopic(tableOptions)) {
                                         throw new ValidationException(
@@ -216,56 +206,6 @@ class KafkaConnectorOptionsUtil {
         return tableOptions.getOptional(TOPIC).map(t -> t.size() == 
1).orElse(false);
     }
 
-    /**
-     * Parses SpecificOffsets String to Map.
-     *
-     * <p>SpecificOffsets String format was given as following:
-     *
-     * <pre>
-     *     scan.startup.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
-     * </pre>
-     *
-     * @return SpecificOffsets with Map format, key is partition, and value is 
offset
-     */
-    public static Map<Integer, Long> parseSpecificOffsets(
-            String specificOffsetsStr, String optionKey) {
-        final Map<Integer, Long> offsetMap = new HashMap<>();
-        final String[] pairs = specificOffsetsStr.split(";");
-        final String validationExceptionMessage =
-                String.format(
-                        "Invalid properties '%s' should follow the format "
-                                + 
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
-                        optionKey, specificOffsetsStr);
-
-        if (pairs.length == 0) {
-            throw new ValidationException(validationExceptionMessage);
-        }
-
-        for (String pair : pairs) {
-            if (null == pair || pair.length() == 0 || !pair.contains(",")) {
-                throw new ValidationException(validationExceptionMessage);
-            }
-
-            final String[] kv = pair.split(",");
-            if (kv.length != 2
-                    || !kv[0].startsWith(PARTITION + ':')
-                    || !kv[1].startsWith(OFFSET + ':')) {
-                throw new ValidationException(validationExceptionMessage);
-            }
-
-            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
-            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
-            try {
-                final Integer partition = Integer.valueOf(partitionValue);
-                final Long offset = Long.valueOf(offsetValue);
-                offsetMap.put(partition, offset);
-            } catch (NumberFormatException e) {
-                throw new ValidationException(validationExceptionMessage, e);
-            }
-        }
-        return offsetMap;
-    }
-
     public static StartupOptions getStartupOptions(ReadableConfig 
tableOptions) {
         final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
         final StartupMode startupMode =
@@ -306,9 +246,9 @@ class KafkaConnectorOptionsUtil {
 
     /**
      * Returns the {@link StartupMode} of Kafka Consumer by passed-in 
table-specific {@link
-     * KafkaConnectorOptions.ScanStartupMode}.
+     * ScanStartupMode}.
      */
-    private static StartupMode 
fromOption(KafkaConnectorOptions.ScanStartupMode scanStartupMode) {
+    private static StartupMode fromOption(ScanStartupMode scanStartupMode) {
         switch (scanStartupMode) {
             case EARLIEST_OFFSET:
                 return StartupMode.EARLIEST;
@@ -327,12 +267,124 @@ class KafkaConnectorOptionsUtil {
         }
     }
 
-    static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
-        if (tableOptions.get(DELIVERY_GUARANTEE) == 
DeliveryGuarantee.EXACTLY_ONCE
-                && 
!tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent()) {
+    public static Properties getKafkaProperties(Map<String, String> 
tableOptions) {
+        final Properties kafkaProperties = new Properties();
+
+        if (hasKafkaClientProperties(tableOptions)) {
+            tableOptions.keySet().stream()
+                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
+                    .forEach(
+                            key -> {
+                                final String value = tableOptions.get(key);
+                                final String subKey = 
key.substring((PROPERTIES_PREFIX).length());
+                                kafkaProperties.put(subKey, value);
+                            });
+        }
+        return kafkaProperties;
+    }
+
+    /**
+     * The partitioner can be either "fixed", "round-robin" or a customized 
partitioner full class
+     * name.
+     */
+    public static Optional<FlinkKafkaPartitioner<RowData>> 
getFlinkKafkaPartitioner(
+            ReadableConfig tableOptions, ClassLoader classLoader) {
+        return tableOptions
+                .getOptional(SINK_PARTITIONER)
+                .flatMap(
+                        (String partitioner) -> {
+                            switch (partitioner) {
+                                case SINK_PARTITIONER_VALUE_FIXED:
+                                    return Optional.of(new 
FlinkFixedPartitioner<>());
+                                case SINK_PARTITIONER_VALUE_DEFAULT:
+                                case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+                                    return Optional.empty();
+                                // Default fallback to full class name of the 
partitioner.
+                                default:
+                                    return Optional.of(
+                                            initializePartitioner(partitioner, 
classLoader));
+                            }
+                        });
+    }
+
+    /**
+     * Parses SpecificOffsets String to Map.
+     *
+     * <p>SpecificOffsets String format was given as following:
+     *
+     * <pre>
+     *     scan.startup.specific-offsets = 
partition:0,offset:42;partition:1,offset:300
+     * </pre>
+     *
+     * @return SpecificOffsets with Map format, key is partition, and value is 
offset
+     */
+    public static Map<Integer, Long> parseSpecificOffsets(
+            String specificOffsetsStr, String optionKey) {
+        final Map<Integer, Long> offsetMap = new HashMap<>();
+        final String[] pairs = specificOffsetsStr.split(";");
+        final String validationExceptionMessage =
+                String.format(
+                        "Invalid properties '%s' should follow the format "
+                                + 
"'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
+                        optionKey, specificOffsetsStr);
+
+        if (pairs.length == 0) {
+            throw new ValidationException(validationExceptionMessage);
+        }
+
+        for (String pair : pairs) {
+            if (null == pair || !pair.contains(",")) {
+                throw new ValidationException(validationExceptionMessage);
+            }
+
+            final String[] kv = pair.split(",");
+            if (kv.length != 2
+                    || !kv[0].startsWith(PARTITION + ':')
+                    || !kv[1].startsWith(OFFSET + ':')) {
+                throw new ValidationException(validationExceptionMessage);
+            }
+
+            String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
+            String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
+            try {
+                final Integer partition = Integer.valueOf(partitionValue);
+                final Long offset = Long.valueOf(offsetValue);
+                offsetMap.put(partition, offset);
+            } catch (NumberFormatException e) {
+                throw new ValidationException(validationExceptionMessage, e);
+            }
+        }
+        return offsetMap;
+    }
+
+    /**
+     * Decides if the table options contains Kafka client properties that 
start with prefix
+     * 'properties'.
+     */
+    private static boolean hasKafkaClientProperties(Map<String, String> 
tableOptions) {
+        return tableOptions.keySet().stream().anyMatch(k -> 
k.startsWith(PROPERTIES_PREFIX));
+    }
+
+    /** Returns a class value with the given class name. */
+    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
+            String name, ClassLoader classLoader) {
+        try {
+            Class<?> clazz = Class.forName(name, true, classLoader);
+            if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
+                throw new ValidationException(
+                        String.format(
+                                "Sink partitioner class '%s' should extend 
from the required class %s",
+                                name, FlinkKafkaPartitioner.class.getName()));
+            }
+            @SuppressWarnings("unchecked")
+            final FlinkKafkaPartitioner<T> kafkaPartitioner =
+                    InstantiationUtil.instantiate(name, 
FlinkKafkaPartitioner.class, classLoader);
+
+            return kafkaPartitioner;
+        } catch (ClassNotFoundException | FlinkException e) {
             throw new ValidationException(
-                    TRANSACTIONAL_ID_PREFIX.key()
-                            + " must be specified when using 
DeliveryGuarantee.EXACTLY_ONCE.");
+                    String.format("Could not find and instantiate partitioner 
class '%s'", name),
+                    e);
         }
     }
 
@@ -340,8 +392,8 @@ class KafkaConnectorOptionsUtil {
      * Creates an array of indices that determine which physical fields of the 
table schema to
      * include in the key format and the order that those fields have in the 
key format.
      *
-     * <p>See {@link KafkaConnectorOptions#KEY_FORMAT}, {@link 
KafkaConnectorOptions#KEY_FIELDS},
-     * and {@link KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more 
information.
+     * <p>See {KafkaConnectorOptions#KEY_FORMAT}, { 
KafkaConnectorOptions#KEY_FIELDS},
+     * and {KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more information.
      */
     public static int[] createKeyFormatProjection(
             ReadableConfig options, DataType physicalDataType) {
@@ -406,8 +458,8 @@ class KafkaConnectorOptionsUtil {
      * Creates an array of indices that determine which physical fields of the 
table schema to
      * include in the value format.
      *
-     * <p>See {@link KafkaConnectorOptions#VALUE_FORMAT}, {@link
-     * KafkaConnectorOptions#VALUE_FIELDS_INCLUDE}, and {@link
+     * <p>See {KafkaConnectorOptions#VALUE_FORMAT}, {
+     * KafkaConnectorOptions#VALUE_FIELDS_INCLUDE}, and {
      * KafkaConnectorOptions#KEY_FIELDS_PREFIX} for more information.
      */
     public static int[] createValueFormatProjection(
@@ -420,19 +472,19 @@ class KafkaConnectorOptionsUtil {
 
         final String keyPrefix = 
options.getOptional(KEY_FIELDS_PREFIX).orElse("");
 
-        final KafkaConnectorOptions.ValueFieldsStrategy strategy = 
options.get(VALUE_FIELDS_INCLUDE);
-        if (strategy == KafkaConnectorOptions.ValueFieldsStrategy.ALL) {
+        final ValueFieldsStrategy strategy = options.get(VALUE_FIELDS_INCLUDE);
+        if (strategy == ValueFieldsStrategy.ALL) {
             if (keyPrefix.length() > 0) {
                 throw new ValidationException(
                         String.format(
                                 "A key prefix is not allowed when option '%s' 
is set to '%s'. "
                                         + "Set it to '%s' instead to avoid 
field overlaps.",
                                 VALUE_FIELDS_INCLUDE.key(),
-                                KafkaConnectorOptions.ValueFieldsStrategy.ALL,
-                                
KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY));
+                                ValueFieldsStrategy.ALL,
+                                ValueFieldsStrategy.EXCEPT_KEY));
             }
             return physicalFields.toArray();
-        } else if (strategy == 
KafkaConnectorOptions.ValueFieldsStrategy.EXCEPT_KEY) {
+        } else if (strategy == ValueFieldsStrategy.EXCEPT_KEY) {
             final int[] keyProjection = createKeyFormatProjection(options, 
physicalDataType);
             return physicalFields
                     .filter(pos -> IntStream.of(keyProjection).noneMatch(k -> 
k == pos))
@@ -497,82 +549,20 @@ class KafkaConnectorOptionsUtil {
         }
     }
 
-    /**
-     * The partitioner can be either "fixed", "round-robin" or a customized 
partitioner full class
-     * name.
-     */
-    public static Optional<FlinkKafkaPartitioner<RowData>> 
getFlinkKafkaPartitioner(
-            ReadableConfig tableOptions, ClassLoader classLoader) {
-        return tableOptions
-                .getOptional(SINK_PARTITIONER)
-                .flatMap(
-                        (String partitioner) -> {
-                            switch (partitioner) {
-                                case SINK_PARTITIONER_VALUE_FIXED:
-                                    return Optional.of(new 
FlinkFixedPartitioner<>());
-                                case SINK_PARTITIONER_VALUE_DEFAULT:
-                                case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
-                                    return Optional.empty();
-                                // Default fallback to full class name of the 
partitioner.
-                                default:
-                                    return Optional.of(
-                                            initializePartitioner(partitioner, 
classLoader));
-                            }
-                        });
-    }
-
-    /** Returns a class value with the given class name. */
-    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(
-            String name, ClassLoader classLoader) {
-        try {
-            Class<?> clazz = Class.forName(name, true, classLoader);
-            if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
-                throw new ValidationException(
-                        String.format(
-                                "Sink partitioner class '%s' should extend 
from the required class %s",
-                                name, FlinkKafkaPartitioner.class.getName()));
-            }
-            @SuppressWarnings("unchecked")
-            final FlinkKafkaPartitioner<T> kafkaPartitioner =
-                    InstantiationUtil.instantiate(name, 
FlinkKafkaPartitioner.class, classLoader);
-
-            return kafkaPartitioner;
-        } catch (ClassNotFoundException | FlinkException e) {
+    static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
+        if (tableOptions.get(DELIVERY_GUARANTEE) == 
DeliveryGuarantee.EXACTLY_ONCE
+                && 
!tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent()) {
             throw new ValidationException(
-                    String.format("Could not find and instantiate partitioner 
class '%s'", name),
-                    e);
-        }
-    }
-
-    public static Properties getKafkaProperties(Map<String, String> 
tableOptions) {
-        final Properties kafkaProperties = new Properties();
-
-        if (hasKafkaClientProperties(tableOptions)) {
-            tableOptions.keySet().stream()
-                    .filter(key -> key.startsWith(PROPERTIES_PREFIX))
-                    .forEach(
-                            key -> {
-                                final String value = tableOptions.get(key);
-                                final String subKey = 
key.substring((PROPERTIES_PREFIX).length());
-                                kafkaProperties.put(subKey, value);
-                            });
+                    TRANSACTIONAL_ID_PREFIX.key()
+                            + " must be specified when using 
DeliveryGuarantee.EXACTLY_ONCE.");
         }
-        return kafkaProperties;
-    }
-
-    /**
-     * Decides if the table options contains Kafka client properties that 
start with prefix
-     * 'properties'.
-     */
-    private static boolean hasKafkaClientProperties(Map<String, String> 
tableOptions) {
-        return tableOptions.keySet().stream().anyMatch(k -> 
k.startsWith(PROPERTIES_PREFIX));
     }
 
     // 
--------------------------------------------------------------------------------------------
     // Inner classes
     // 
--------------------------------------------------------------------------------------------
 
-    /** Kafka startup options. */
+    /** Kafka startup options. * */
     public static class StartupOptions {
 
         public StartupMode startupMode;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
new file mode 100644
index 0000000000..96892d747a
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A version-agnostic Kafka {@link DynamicTableSink}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
+@Internal
+public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetadata {
+
+    private static final String UPSERT_KAFKA_TRANSFORMATION = "upsert-kafka";
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Metadata that is appended at the end of a physical sink row. */
+    protected List<String> metadataKeys;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Data type of consumed data type. */
+    protected DataType consumedDataType;
+
+    /** Data type to configure the formats. */
+    protected final DataType physicalDataType;
+
+    /** Optional format for encoding keys to Kafka. */
+    protected final @Nullable EncodingFormat<SerializationSchema<RowData>> 
keyEncodingFormat;
+
+    /** Format for encoding values to Kafka. */
+    protected final EncodingFormat<SerializationSchema<RowData>> 
valueEncodingFormat;
+
+    /** Indices that determine the key fields and the source position in the 
consumed row. */
+    protected final int[] keyProjection;
+
+    /** Indices that determine the value fields and the source position in the 
consumed row. */
+    protected final int[] valueProjection;
+
+    /** Prefix that needs to be removed from fields when constructing the 
physical data type. */
+    protected final @Nullable String keyPrefix;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Kafka-specific attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** The defined delivery guarantee. */
+    private final DeliveryGuarantee deliveryGuarantee;
+
+    /**
+     * If the {@link #deliveryGuarantee} is {@link 
DeliveryGuarantee#EXACTLY_ONCE} the value is the
+     * prefix for all ids of opened Kafka transactions.
+     */
+    @Nullable
+    private final String transactionalIdPrefix;
+
+    /** The Kafka topic to write to. */
+    protected final String topic;
+
+    /** Properties for the Kafka producer. */
+    protected final Properties properties;
+
+    /** Partitioner to select Kafka partition for each item. */
+    protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
+
+    /**
+     * Flag to determine sink mode. In upsert mode sink transforms the 
delete/update-before message
+     * to tombstone message.
+     */
+    protected final boolean upsertMode;
+
+    /** Sink buffer flush config which only supported in upsert mode now. */
+    protected final SinkBufferFlushMode flushMode;
+
+    /** Parallelism of the physical Kafka producer. * */
+    protected final @Nullable Integer parallelism;
+
+    private final MetricOption metricOption;
+
+    public KafkaDynamicSink(
+            DataType consumedDataType,
+            DataType physicalDataType,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> 
keyEncodingFormat,
+            EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            String topic,
+            Properties properties,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            DeliveryGuarantee deliveryGuarantee,
+            boolean upsertMode,
+            SinkBufferFlushMode flushMode,
+            @Nullable Integer parallelism,
+            @Nullable String transactionalIdPrefix,
+            MetricOption metricOption) {
+        // Format attributes
+        this.consumedDataType =
+                checkNotNull(consumedDataType, "Consumed data type must not be 
null.");
+        this.physicalDataType =
+                checkNotNull(physicalDataType, "Physical data type must not be 
null.");
+        this.keyEncodingFormat = keyEncodingFormat;
+        this.valueEncodingFormat =
+                checkNotNull(valueEncodingFormat, "Value encoding format must 
not be null.");
+        this.keyProjection = checkNotNull(keyProjection, "Key projection must 
not be null.");
+        this.valueProjection = checkNotNull(valueProjection, "Value projection 
must not be null.");
+        this.keyPrefix = keyPrefix;
+        this.transactionalIdPrefix = transactionalIdPrefix;
+        // Mutable attributes
+        this.metadataKeys = Collections.emptyList();
+        // Kafka-specific attributes
+        this.topic = checkNotNull(topic, "Topic must not be null.");
+        this.properties = checkNotNull(properties, "Properties must not be 
null.");
+        this.partitioner = partitioner;
+        this.deliveryGuarantee =
+                checkNotNull(deliveryGuarantee, "DeliveryGuarantee must not be 
null.");
+        this.upsertMode = upsertMode;
+        this.flushMode = checkNotNull(flushMode);
+        if (flushMode.isEnabled() && !upsertMode) {
+            throw new IllegalArgumentException(
+                    "Sink buffer flush is only supported in upsert-kafka.");
+        }
+        this.parallelism = parallelism;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return valueEncodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        final SerializationSchema<RowData> keySerialization =
+                createSerialization(context, keyEncodingFormat, keyProjection, 
keyPrefix);
+
+        final SerializationSchema<RowData> valueSerialization =
+                createSerialization(context, valueEncodingFormat, 
valueProjection, null);
+
+        final KafkaSinkBuilder<RowData> sinkBuilder = KafkaSink.builder();
+        final List<LogicalType> physicalChildren = 
physicalDataType.getLogicalType().getChildren();
+        if (transactionalIdPrefix != null) {
+            sinkBuilder.setTransactionalIdPrefix(transactionalIdPrefix);
+        }
+        final KafkaSink<RowData> kafkaSink =
+                sinkBuilder
+                        .setDeliverGuarantee(deliveryGuarantee)
+                        .setBootstrapServers(
+                                
properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString())
+                        .setKafkaProducerConfig(properties)
+                        .setRecordSerializer(
+                                new DynamicKafkaRecordSerializationSchema(
+                                        topic,
+                                        partitioner,
+                                        keySerialization,
+                                        valueSerialization,
+                                        getFieldGetters(physicalChildren, 
keyProjection),
+                                        getFieldGetters(physicalChildren, 
valueProjection),
+                                        hasMetadata(),
+                                        getMetadataPositions(physicalChildren),
+                                        upsertMode,
+                                        metricOption))
+                        .build();
+        if (flushMode.isEnabled() && upsertMode) {
+            return new DataStreamSinkProvider() {
+
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> 
dataStream) {
+                    final boolean objectReuse =
+                            
dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled();
+                    final ReducingUpsertSink<?> sink =
+                            new ReducingUpsertSink<>(
+                                    kafkaSink,
+                                    physicalDataType,
+                                    keyProjection,
+                                    flushMode,
+                                    objectReuse
+                                            ? rowData -> 
createRowDataTypeSerializer(
+                                                    context,
+                                                    
dataStream.getExecutionConfig()).copy(rowData)
+                                            : rowData -> rowData,
+                                    metricOption);
+                    final DataStreamSink<RowData> end = 
dataStream.sinkTo(sink);
+                    
providerContext.generateUid(UPSERT_KAFKA_TRANSFORMATION).ifPresent(end::uid);
+                    if (parallelism != null) {
+                        end.setParallelism(parallelism);
+                    }
+                    return end;
+                }
+            };
+        }
+        return SinkV2Provider.of(kafkaSink, parallelism);
+    }
+
+    @Override
+    public Map<String, DataType> listWritableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(WritableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyWritableMetadata(List<String> metadataKeys, DataType 
consumedDataType) {
+        this.metadataKeys = metadataKeys;
+        this.consumedDataType = consumedDataType;
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        final KafkaDynamicSink copy =
+                new KafkaDynamicSink(
+                        consumedDataType,
+                        physicalDataType,
+                        keyEncodingFormat,
+                        valueEncodingFormat,
+                        keyProjection,
+                        valueProjection,
+                        keyPrefix,
+                        topic,
+                        properties,
+                        partitioner,
+                        deliveryGuarantee,
+                        upsertMode,
+                        flushMode,
+                        parallelism,
+                        transactionalIdPrefix,
+                        metricOption);
+        copy.metadataKeys = metadataKeys;
+        return copy;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Kafka table sink";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final KafkaDynamicSink that = (KafkaDynamicSink) o;
+        return Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(consumedDataType, that.consumedDataType)
+                && Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(keyEncodingFormat, that.keyEncodingFormat)
+                && Objects.equals(valueEncodingFormat, 
that.valueEncodingFormat)
+                && Arrays.equals(keyProjection, that.keyProjection)
+                && Arrays.equals(valueProjection, that.valueProjection)
+                && Objects.equals(keyPrefix, that.keyPrefix)
+                && Objects.equals(topic, that.topic)
+                && Objects.equals(properties, that.properties)
+                && Objects.equals(partitioner, that.partitioner)
+                && Objects.equals(deliveryGuarantee, that.deliveryGuarantee)
+                && Objects.equals(upsertMode, that.upsertMode)
+                && Objects.equals(flushMode, that.flushMode)
+                && Objects.equals(transactionalIdPrefix, 
that.transactionalIdPrefix)
+                && Objects.equals(parallelism, that.parallelism)
+                && Objects.equals(metricOption, that.metricOption);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                metadataKeys,
+                consumedDataType,
+                physicalDataType,
+                keyEncodingFormat,
+                valueEncodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topic,
+                properties,
+                partitioner,
+                deliveryGuarantee,
+                upsertMode,
+                flushMode,
+                transactionalIdPrefix,
+                parallelism,
+                metricOption);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private TypeSerializer<RowData> createRowDataTypeSerializer(
+            Context context, ExecutionConfig executionConfig) {
+        final TypeInformation<RowData> typeInformation =
+                context.createTypeInformation(consumedDataType);
+        return typeInformation.createSerializer(executionConfig);
+    }
+
+    private int[] getMetadataPositions(List<LogicalType> physicalChildren) {
+        return Stream.of(WritableMetadata.values())
+                .mapToInt(
+                        m -> {
+                            final int pos = metadataKeys.indexOf(m.key);
+                            if (pos < 0) {
+                                return -1;
+                            }
+                            return physicalChildren.size() + pos;
+                        })
+                .toArray();
+    }
+
+    private boolean hasMetadata() {
+        return metadataKeys.size() > 0;
+    }
+
+    private RowData.FieldGetter[] getFieldGetters(
+            List<LogicalType> physicalChildren, int[] keyProjection) {
+        return Arrays.stream(keyProjection)
+                .mapToObj(
+                        targetField -> RowData.createFieldGetter(
+                                physicalChildren.get(targetField), 
targetField))
+                .toArray(RowData.FieldGetter[]::new);
+    }
+
+    private @Nullable SerializationSchema<RowData> createSerialization(
+            Context context,
+            @Nullable EncodingFormat<SerializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+        DataType physicalFormatDataType = 
Projection.of(projection).project(this.physicalDataType);
+        if (prefix != null) {
+            physicalFormatDataType = 
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+        }
+        return format.createRuntimeEncoder(context, physicalFormatDataType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    enum WritableMetadata {
+
+        HEADERS(
+                "headers",
+                // key and value of the map are nullable to make handling 
easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), 
DataTypes.BYTES().nullable())
+                        .nullable(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        final MapData map = row.getMap(pos);
+                        final ArrayData keyArray = map.keyArray();
+                        final ArrayData valueArray = map.valueArray();
+                        final List<Header> headers = new ArrayList<>();
+                        for (int i = 0; i < keyArray.size(); i++) {
+                            if (!keyArray.isNullAt(i) && 
!valueArray.isNullAt(i)) {
+                                final String key = 
keyArray.getString(i).toString();
+                                final byte[] value = valueArray.getBinary(i);
+                                headers.add(new KafkaHeader(key, value));
+                            }
+                        }
+                        return headers;
+                    }
+                }),
+
+        TIMESTAMP(
+                "timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(RowData row, int pos) {
+                        if (row.isNullAt(pos)) {
+                            return null;
+                        }
+                        return row.getTimestamp(pos, 3).getMillisecond();
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        WritableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+
+    interface MetadataConverter extends Serializable {
+
+        Object read(RowData consumedRow, int pos);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static class KafkaHeader implements Header {
+
+        private final String key;
+
+        private final byte[] value;
+
+        KafkaHeader(String key, byte[] value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        @Override
+        public String key() {
+            return key;
+        }
+
+        @Override
+        public byte[] value() {
+            return value;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
new file mode 100644
index 0000000000..652f971785
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -0,0 +1,637 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import 
org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.source.KafkaSource;
+import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** A version-agnostic Kafka {@link ScanTableSource}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
+@Internal
+public class KafkaDynamicSource
+        implements
+            ScanTableSource,
+            SupportsReadingMetadata,
+            SupportsWatermarkPushDown {
+
+    private static final String KAFKA_TRANSFORMATION = "kafka";
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    /** Watermark strategy that is used to generate per-partition watermark. */
+    protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final String VALUE_METADATA_PREFIX = "value.";
+
+    /** Data type to configure the formats. */
+    protected final DataType physicalDataType;
+
+    /** Optional format for decoding keys from Kafka. */
+    protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat;
+
+    /** Format for decoding values from Kafka. */
+    protected final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat;
+
+    /** Indices that determine the key fields and the target position in the 
produced row. */
+    protected final int[] keyProjection;
+
+    /** Indices that determine the value fields and the target position in the 
produced row. */
+    protected final int[] valueProjection;
+
+    /** Prefix that needs to be removed from fields when constructing the 
physical data type. */
+    protected final @Nullable String keyPrefix;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Kafka-specific attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** The Kafka topics to consume. */
+    protected final List<String> topics;
+
+    /** The Kafka topic pattern to consume. */
+    protected final Pattern topicPattern;
+
+    /** Properties for the Kafka consumer. */
+    protected final Properties properties;
+
+    /**
+     * The startup mode for the contained consumer (default is 
{StartupMode#GROUP_OFFSETS}).
+     */
+    protected final StartupMode startupMode;
+
+    /**
+     * Specific startup offsets; only relevant when startup mode is {@link
+     * StartupMode#SPECIFIC_OFFSETS}.
+     */
+    protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+    /**
+     * The start timestamp to locate partition offsets; only relevant when 
startup mode is {@link
+     * StartupMode#TIMESTAMP}.
+     */
+    protected final long startupTimestampMillis;
+
+    /** Flag to determine source mode. In upsert mode, it will keep the 
tombstone message. * */
+    protected final boolean upsertMode;
+
+    protected final String tableIdentifier;
+
+    private final MetricOption metricOption;
+
+    public KafkaDynamicSource(
+            DataType physicalDataType,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            @Nullable List<String> topics,
+            @Nullable Pattern topicPattern,
+            Properties properties,
+            StartupMode startupMode,
+            Map<KafkaTopicPartition, Long> specificStartupOffsets,
+            long startupTimestampMillis,
+            boolean upsertMode,
+            String tableIdentifier,
+            MetricOption metricOption) {
+        // Format attributes
+        this.physicalDataType =
+                Preconditions.checkNotNull(
+                        physicalDataType, "Physical data type must not be 
null.");
+        this.keyDecodingFormat = keyDecodingFormat;
+        this.valueDecodingFormat =
+                Preconditions.checkNotNull(
+                        valueDecodingFormat, "Value decoding format must not 
be null.");
+        this.keyProjection =
+                Preconditions.checkNotNull(keyProjection, "Key projection must 
not be null.");
+        this.valueProjection =
+                Preconditions.checkNotNull(valueProjection, "Value projection 
must not be null.");
+        this.keyPrefix = keyPrefix;
+        // Mutable attributes
+        this.producedDataType = physicalDataType;
+        this.metadataKeys = Collections.emptyList();
+        this.watermarkStrategy = null;
+        // Kafka-specific attributes
+        Preconditions.checkArgument(
+                (topics != null && topicPattern == null)
+                        || (topics == null && topicPattern != null),
+                "Either Topic or Topic Pattern must be set for source.");
+        this.topics = topics;
+        this.topicPattern = topicPattern;
+        this.properties = Preconditions.checkNotNull(properties, "Properties 
must not be null.");
+        this.startupMode =
+                Preconditions.checkNotNull(startupMode, "Startup mode must not 
be null.");
+        this.specificStartupOffsets =
+                Preconditions.checkNotNull(
+                        specificStartupOffsets, "Specific offsets must not be 
null.");
+        this.startupTimestampMillis = startupTimestampMillis;
+        this.upsertMode = upsertMode;
+        this.tableIdentifier = tableIdentifier;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return valueDecodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+        final DeserializationSchema<RowData> keyDeserialization =
+                createDeserialization(context, keyDecodingFormat, 
keyProjection, keyPrefix);
+
+        final DeserializationSchema<RowData> valueDeserialization =
+                createDeserialization(context, valueDecodingFormat, 
valueProjection, null);
+
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        final KafkaSource<RowData> kafkaSource =
+                createKafkaSource(keyDeserialization, valueDeserialization, 
producedTypeInfo);
+
+        return new DataStreamScanProvider() {
+
+            @Override
+            public DataStream<RowData> produceDataStream(
+                    ProviderContext providerContext, 
StreamExecutionEnvironment execEnv) {
+                if (watermarkStrategy == null) {
+                    watermarkStrategy = WatermarkStrategy.noWatermarks();
+                }
+                DataStreamSource<RowData> sourceStream =
+                        execEnv.fromSource(
+                                kafkaSource, watermarkStrategy, "KafkaSource-" 
+ tableIdentifier);
+                
providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid);
+                return sourceStream;
+            }
+
+            @Override
+            public boolean isBounded() {
+                return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
+            }
+        };
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        // according to convention, the order of the final row must be
+        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+        // where the format metadata has highest precedence
+
+        // add value format metadata with prefix
+        valueDecodingFormat
+                .listReadableMetadata()
+                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX 
+ key, value));
+
+        // add connector metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, 
m.dataType));
+
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        // separate connector and format metadata
+        final List<String> formatMetadataKeys =
+                metadataKeys.stream()
+                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+                        .collect(Collectors.toList());
+        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
+        connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+        // push down format metadata
+        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    formatMetadataKeys.stream()
+                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
+                            .collect(Collectors.toList());
+            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+
+        this.metadataKeys = connectorMetadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public boolean supportsMetadataProjection() {
+        return false;
+    }
+
+    @Override
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        final KafkaDynamicSource copy =
+                new KafkaDynamicSource(
+                        physicalDataType,
+                        keyDecodingFormat,
+                        valueDecodingFormat,
+                        keyProjection,
+                        valueProjection,
+                        keyPrefix,
+                        topics,
+                        topicPattern,
+                        properties,
+                        startupMode,
+                        specificStartupOffsets,
+                        startupTimestampMillis,
+                        upsertMode,
+                        tableIdentifier,
+                        metricOption);
+        copy.producedDataType = producedDataType;
+        copy.metadataKeys = metadataKeys;
+        copy.watermarkStrategy = watermarkStrategy;
+        return copy;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Kafka table source";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final KafkaDynamicSource that = (KafkaDynamicSource) o;
+        return Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+                && Objects.equals(valueDecodingFormat, 
that.valueDecodingFormat)
+                && Arrays.equals(keyProjection, that.keyProjection)
+                && Arrays.equals(valueProjection, that.valueProjection)
+                && Objects.equals(keyPrefix, that.keyPrefix)
+                && Objects.equals(topics, that.topics)
+                && Objects.equals(String.valueOf(topicPattern), 
String.valueOf(that.topicPattern))
+                && Objects.equals(properties, that.properties)
+                && startupMode == that.startupMode
+                && Objects.equals(specificStartupOffsets, 
that.specificStartupOffsets)
+                && startupTimestampMillis == that.startupTimestampMillis
+                && Objects.equals(upsertMode, that.upsertMode)
+                && Objects.equals(tableIdentifier, that.tableIdentifier)
+                && Objects.equals(watermarkStrategy, that.watermarkStrategy)
+                && Objects.equals(metricOption, that.metricOption);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                producedDataType,
+                metadataKeys,
+                physicalDataType,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                topics,
+                topicPattern,
+                properties,
+                startupMode,
+                specificStartupOffsets,
+                startupTimestampMillis,
+                upsertMode,
+                tableIdentifier,
+                watermarkStrategy,
+                metricOption);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    protected KafkaSource<RowData> createKafkaSource(
+            DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo) {
+
+        final KafkaDeserializationSchema<RowData> kafkaDeserializer =
+                createKafkaDeserializationSchema(
+                        keyDeserialization, valueDeserialization, 
producedTypeInfo);
+
+        final KafkaSourceBuilder<RowData> kafkaSourceBuilder = 
KafkaSource.builder();
+
+        if (topics != null) {
+            kafkaSourceBuilder.setTopics(topics);
+        } else {
+            kafkaSourceBuilder.setTopicPattern(topicPattern);
+        }
+
+        switch (startupMode) {
+            case EARLIEST:
+                
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
+                break;
+            case LATEST:
+                
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
+                break;
+            case GROUP_OFFSETS:
+                String offsetResetConfig =
+                        properties.getProperty(
+                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                                OffsetResetStrategy.NONE.name());
+                OffsetResetStrategy offsetResetStrategy = 
getResetStrategy(offsetResetConfig);
+                kafkaSourceBuilder.setStartingOffsets(
+                        
OffsetsInitializer.committedOffsets(offsetResetStrategy));
+                break;
+            case SPECIFIC_OFFSETS:
+                Map<TopicPartition, Long> offsets = new HashMap<>();
+                specificStartupOffsets.forEach(
+                        (tp, offset) -> offsets.put(
+                                new TopicPartition(tp.getTopic(), 
tp.getPartition()),
+                                offset));
+                
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(offsets));
+                break;
+            case TIMESTAMP:
+                kafkaSourceBuilder.setStartingOffsets(
+                        OffsetsInitializer.timestamp(startupTimestampMillis));
+                break;
+        }
+        kafkaSourceBuilder
+                .setProperties(properties)
+                
.setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer));
+
+        return kafkaSourceBuilder.build();
+    }
+
+    private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
+        return Arrays.stream(OffsetResetStrategy.values())
+                .filter(ors -> 
ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
+                .findAny()
+                .orElseThrow(
+                        () -> new IllegalArgumentException(
+                                String.format(
+                                        "%s can not be set to %s. Valid 
values: [%s]",
+                                        
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                                        offsetResetConfig,
+                                        
Arrays.stream(OffsetResetStrategy.values())
+                                                .map(Enum::name)
+                                                .map(String::toLowerCase)
+                                                
.collect(Collectors.joining(",")))));
+    }
+
+    private KafkaDeserializationSchema<RowData> 
createKafkaDeserializationSchema(
+            DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo) {
+        final MetadataConverter[] metadataConverters =
+                metadataKeys.stream()
+                        .map(
+                                k -> Stream.of(ReadableMetadata.values())
+                                        .filter(rm -> rm.key.equals(k))
+                                        .findFirst()
+                                        
.orElseThrow(IllegalStateException::new))
+                        .map(m -> m.converter)
+                        .toArray(MetadataConverter[]::new);
+
+        // check if connector metadata is used at all
+        final boolean hasMetadata = metadataKeys.size() > 0;
+
+        // adjust physical arity with value format's metadata
+        final int adjustedPhysicalArity =
+                DataType.getFieldDataTypes(producedDataType).size() - 
metadataKeys.size();
+
+        // adjust value format projection to include value format's metadata 
columns at the end
+        final int[] adjustedValueProjection =
+                IntStream.concat(
+                        IntStream.of(valueProjection),
+                        IntStream.range(
+                                keyProjection.length + valueProjection.length,
+                                adjustedPhysicalArity))
+                        .toArray();
+        return new DynamicKafkaDeserializationSchema(
+                adjustedPhysicalArity,
+                keyDeserialization,
+                keyProjection,
+                valueDeserialization,
+                adjustedValueProjection,
+                hasMetadata,
+                metadataConverters,
+                producedTypeInfo,
+                upsertMode,
+                metricOption);
+    }
+
+    private @Nullable DeserializationSchema<RowData> createDeserialization(
+            Context context,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+        DataType physicalFormatDataType = 
Projection.of(projection).project(this.physicalDataType);
+        if (prefix != null) {
+            physicalFormatDataType = 
DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
+        }
+        return format.createRuntimeDecoder(context, physicalFormatDataType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    enum ReadableMetadata {
+
+        TOPIC(
+                "topic",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return StringData.fromString(record.topic());
+                    }
+                }),
+
+        PARTITION(
+                "partition",
+                DataTypes.INT().notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.partition();
+                    }
+                }),
+
+        HEADERS(
+                "headers",
+                // key and value of the map are nullable to make handling 
easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), 
DataTypes.BYTES().nullable())
+                        .notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        final Map<StringData, byte[]> map = new HashMap<>();
+                        for (Header header : record.headers()) {
+                            map.put(StringData.fromString(header.key()), 
header.value());
+                        }
+                        return new GenericMapData(map);
+                    }
+                }),
+
+        LEADER_EPOCH(
+                "leader-epoch",
+                DataTypes.INT().nullable(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.leaderEpoch().orElse(null);
+                    }
+                }),
+
+        OFFSET(
+                "offset",
+                DataTypes.BIGINT().notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.offset();
+                    }
+                }),
+
+        TIMESTAMP(
+                "timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return 
TimestampData.fromEpochMillis(record.timestamp());
+                    }
+                }),
+
+        TIMESTAMP_TYPE(
+                "timestamp-type",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return 
StringData.fromString(record.timestampType().toString());
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index c87735ffc8..3a320b7f88 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.kafka.table;
 
 import org.apache.inlong.sort.base.Constants;
-import org.apache.inlong.sort.kafka.KafkaOptions;
+import org.apache.inlong.sort.base.metric.MetricOption;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -32,8 +32,6 @@ import 
org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
 import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -67,40 +65,16 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARTITIONER;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.*;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.StartupOptions;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getStartupOptions;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSinkOptions;
-import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.validateTableSourceOptions;
+import static org.apache.inlong.sort.base.Constants.*;
+import static org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.*;
 
 /**
- * Factory for creating configured instances of {@link KafkaDynamicSource} and 
{@link
+ * Factory for creating configured instances of KafkaDynamicSource and
  * KafkaDynamicSink}.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
  */
 @Internal
 public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
@@ -181,6 +155,7 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
 
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
+
         final TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
 
         final Optional<DecodingFormat<DeserializationSchema<RowData>>> 
keyDecodingFormat =
@@ -222,6 +197,15 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
 
         final String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
 
+        String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
         return createKafkaTableSource(
                 physicalDataType,
                 keyDecodingFormat.orElse(null),
@@ -235,7 +219,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 startupOptions.startupMode,
                 startupOptions.specificOffsets,
                 startupOptions.startupTimestampMillis,
-                context.getObjectIdentifier().asSummaryString());
+                context.getObjectIdentifier().asSummaryString(),
+                metricOption);
     }
 
     @Override
@@ -250,7 +235,7 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat 
=
                 getValueEncodingFormat(helper);
 
-        helper.validateExcept(KafkaConnectorOptionsUtil.PROPERTIES_PREFIX);
+        helper.validateExcept(PROPERTIES_PREFIX);
 
         final ReadableConfig tableOptions = helper.getOptions();
 
@@ -275,6 +260,16 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
 
         final Integer parallelism = 
tableOptions.getOptional(SINK_PARALLELISM).orElse(null);
 
+        String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
         return createKafkaTableSink(
                 physicalDataType,
                 keyEncodingFormat.orElse(null),
@@ -287,7 +282,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 getFlinkKafkaPartitioner(tableOptions, 
context.getClassLoader()).orElse(null),
                 deliveryGuarantee,
                 parallelism,
-                tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+                tableOptions.get(TRANSACTIONAL_ID_PREFIX),
+                metricOption);
     }
 
     private static Optional<DecodingFormat<DeserializationSchema<RowData>>> 
getKeyDecodingFormat(
@@ -390,7 +386,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
             StartupMode startupMode,
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
             long startupTimestampMillis,
-            String tableIdentifier) {
+            String tableIdentifier,
+            MetricOption metricOption) {
         return new KafkaDynamicSource(
                 physicalDataType,
                 keyDecodingFormat,
@@ -405,7 +402,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 specificStartupOffsets,
                 startupTimestampMillis,
                 false,
-                tableIdentifier);
+                tableIdentifier,
+                metricOption);
     }
 
     protected KafkaDynamicSink createKafkaTableSink(
@@ -420,7 +418,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
             FlinkKafkaPartitioner<RowData> partitioner,
             DeliveryGuarantee deliveryGuarantee,
             Integer parallelism,
-            @Nullable String transactionalIdPrefix) {
+            @Nullable String transactionalIdPrefix,
+            MetricOption metricOption) {
         return new KafkaDynamicSink(
                 physicalDataType,
                 physicalDataType,
@@ -436,7 +435,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 false,
                 SinkBufferFlushMode.DISABLED,
                 parallelism,
-                transactionalIdPrefix);
+                transactionalIdPrefix,
+                metricOption);
     }
 
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
similarity index 92%
rename from 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
rename to 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
index 6962eb6948..2fe0b41baa 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaOptions.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
@@ -15,12 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.kafka;
+package org.apache.inlong.sort.kafka.table;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
-/** Option utils for Kafka table source sink. */
+/** Option utils for Kafka table source sink.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
 public class KafkaOptions {
 
     private KafkaOptions() {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
new file mode 100644
index 0000000000..e00e219fe8
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.function.SerializableFunction;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper 
{@link SinkWriter}
+ * and only emit it when the buffer is full or a timer is triggered or a 
checkpoint happens.
+ *
+ * <p>The sink provides eventual consistency guarantees without the need of a 
two-phase protocol
+ * because the updates are idempotent therefore duplicates have no effect.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ */
+class ReducingUpsertSink<WriterState> implements StatefulSink<RowData, 
WriterState> {
+
+    private final StatefulSink<RowData, WriterState> wrappedSink;
+    private final DataType physicalDataType;
+    private final int[] keyProjection;
+    private final SinkBufferFlushMode bufferFlushMode;
+    private final SerializableFunction<RowData, RowData> valueCopyFunction;
+    private final MetricOption metricOption;
+
+    ReducingUpsertSink(
+            StatefulSink<RowData, WriterState> wrappedSink,
+            DataType physicalDataType,
+            int[] keyProjection,
+            SinkBufferFlushMode bufferFlushMode,
+            SerializableFunction<RowData, RowData> valueCopyFunction,
+            MetricOption metricOption) {
+        this.wrappedSink = wrappedSink;
+        this.physicalDataType = physicalDataType;
+        this.keyProjection = keyProjection;
+        this.bufferFlushMode = bufferFlushMode;
+        this.valueCopyFunction = valueCopyFunction;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public StatefulSinkWriter<RowData, WriterState> createWriter(InitContext 
context)
+            throws IOException {
+        final StatefulSinkWriter<RowData, WriterState> wrapperWriter =
+                wrappedSink.createWriter(context);
+        return new ReducingUpsertWriter<>(
+                wrapperWriter,
+                physicalDataType,
+                keyProjection,
+                bufferFlushMode,
+                context.getProcessingTimeService(),
+                valueCopyFunction,
+                metricOption);
+    }
+
+    @Override
+    public StatefulSinkWriter<RowData, WriterState> restoreWriter(
+            InitContext context, Collection<WriterState> recoveredState) 
throws IOException {
+        final StatefulSinkWriter<RowData, WriterState> wrapperWriter =
+                wrappedSink.restoreWriter(context, recoveredState);
+        return new ReducingUpsertWriter<>(
+                wrapperWriter,
+                physicalDataType,
+                keyProjection,
+                bufferFlushMode,
+                context.getProcessingTimeService(),
+                valueCopyFunction,
+                metricOption);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<WriterState> getWriterStateSerializer() {
+        return wrappedSink.getWriterStateSerializer();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
new file mode 100644
index 0000000000..b118e3cde5
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.kafka.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static 
org.apache.inlong.sort.kafka.table.DynamicKafkaRecordSerializationSchema.createProjectedRow;
+
+/**
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * @param <WriterState>
+ */
+class ReducingUpsertWriter<WriterState>
+        implements
+            StatefulSink.StatefulSinkWriter<RowData, WriterState> {
+
+    private final StatefulSink.StatefulSinkWriter<RowData, WriterState> 
wrappedWriter;
+    private final WrappedContext wrappedContext = new WrappedContext();
+    private final int batchMaxRowNums;
+    private final Function<RowData, RowData> valueCopyFunction;
+    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new 
HashMap<>();
+    private final Function<RowData, RowData> keyExtractor;
+    private final ProcessingTimeService timeService;
+    private final long batchIntervalMs;
+
+    private boolean closed = false;
+    private long lastFlush = System.currentTimeMillis();
+
+    private SourceMetricData sourceMetricData;
+
+    ReducingUpsertWriter(
+            StatefulSink.StatefulSinkWriter<RowData, WriterState> 
wrappedWriter,
+            DataType physicalDataType,
+            int[] keyProjection,
+            SinkBufferFlushMode bufferFlushMode,
+            ProcessingTimeService timeService,
+            Function<RowData, RowData> valueCopyFunction,
+            MetricOption metricOption) {
+        checkArgument(bufferFlushMode != null && bufferFlushMode.isEnabled());
+        this.wrappedWriter = checkNotNull(wrappedWriter);
+        this.timeService = checkNotNull(timeService);
+        this.batchMaxRowNums = bufferFlushMode.getBatchSize();
+        this.batchIntervalMs = bufferFlushMode.getBatchIntervalMs();
+        registerFlush();
+        List<LogicalType> fields = 
physicalDataType.getLogicalType().getChildren();
+        final RowData.FieldGetter[] keyFieldGetters =
+                Arrays.stream(keyProjection)
+                        .mapToObj(
+                                targetField -> RowData.createFieldGetter(
+                                        fields.get(targetField), targetField))
+                        .toArray(RowData.FieldGetter[]::new);
+        this.keyExtractor = rowData -> createProjectedRow(rowData, 
RowKind.INSERT, keyFieldGetters);
+        this.valueCopyFunction = valueCopyFunction;
+        if (metricOption != null) {
+            this.sourceMetricData = new SourceMetricData(metricOption);
+        }
+    }
+
+    @Override
+    public void write(RowData element, Context context) throws IOException, 
InterruptedException {
+        wrappedContext.setContext(context);
+        addToBuffer(element, context.timestamp());
+    }
+
+    @Override
+    public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
+        flush();
+    }
+
+    @Override
+    public List<WriterState> snapshotState(long checkpointId) throws 
IOException {
+        return wrappedWriter.snapshotState(checkpointId);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (!closed) {
+            closed = true;
+            wrappedWriter.close();
+        }
+    }
+
+    private void addToBuffer(RowData row, Long timestamp) throws IOException, 
InterruptedException {
+        RowData key = keyExtractor.apply(row);
+        RowData value = valueCopyFunction.apply(row);
+        reduceBuffer.put(key, new Tuple2<>(changeFlag(value), timestamp));
+
+        if (reduceBuffer.size() >= batchMaxRowNums) {
+            flush();
+        }
+    }
+
+    private void registerFlush() {
+        if (closed) {
+            return;
+        }
+        timeService.registerTimer(
+                lastFlush + batchIntervalMs,
+                (t) -> {
+                    if (t >= lastFlush + batchIntervalMs) {
+                        flush();
+                    }
+                    registerFlush();
+                });
+    }
+
+    private RowData changeFlag(RowData value) {
+        switch (value.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                value.setRowKind(UPDATE_AFTER);
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                value.setRowKind(DELETE);
+        }
+        return value;
+    }
+
+    private void flush() throws IOException, InterruptedException {
+        for (Tuple2<RowData, Long> value : reduceBuffer.values()) {
+            wrappedContext.setTimestamp(value.f1);
+            wrappedWriter.write(value.f0, wrappedContext);
+            if (sourceMetricData != null) {
+                sourceMetricData.outputMetricsWithEstimate(value.f0);
+            }
+        }
+        lastFlush = System.currentTimeMillis();
+        reduceBuffer.clear();
+    }
+
+    /**
+     * Wrapper of {@link Context}.
+     *
+     * <p>When records arrives, the {@link ReducingUpsertWriter} updates the 
current {@link
+     * Context} and memorize the timestamp with the records. When flushing, 
the {@link
+     * ReducingUpsertWriter} will emit the records in the buffer with 
memorized timestamp.
+     */
+    private static class WrappedContext implements Context {
+
+        private long timestamp;
+        private Context context;
+
+        @Override
+        public long currentWatermark() {
+            checkNotNull(context, "context must be set before retrieving it.");
+            return context.currentWatermark();
+        }
+
+        @Override
+        public Long timestamp() {
+            checkNotNull(timestamp, "timestamp must to be set before 
retrieving it.");
+            return timestamp;
+        }
+
+        public void setTimestamp(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        public void setContext(Context context) {
+            this.context = context;
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index 2f61265c32..6a8f5e20c9 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.kafka.table;
 
 import org.apache.inlong.sort.base.Constants;
-import org.apache.inlong.sort.kafka.KafkaOptions;
+import org.apache.inlong.sort.base.metric.MetricOption;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -29,8 +29,6 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink;
-import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
 import org.apache.flink.streaming.connectors.kafka.table.SinkBufferFlushMode;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -58,17 +56,8 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.*;
+import static org.apache.inlong.sort.base.Constants.*;
 import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
 import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject;
 import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
@@ -77,7 +66,10 @@ import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getKa
 import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern;
 import static 
org.apache.inlong.sort.kafka.table.KafkaConnectorOptionsUtil.getSourceTopics;
 
-/** Upsert-Kafka factory. */
+/** Upsert-Kafka factory.
+ * <p>
+ * Copy from org.apache.flink:flink-connector-kafka:1.15.4
+ * */
 public class UpsertKafkaDynamicTableFactory implements 
DynamicTableSourceFactory, DynamicTableSinkFactory {
 
     public static final String IDENTIFIER = "upsert-kafka-inlong";
@@ -138,6 +130,16 @@ public class UpsertKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
         // always use earliest to keep data integrity
         StartupMode earliest = StartupMode.EARLIEST;
 
+        String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
         return new KafkaDynamicSource(
                 context.getPhysicalRowDataType(),
                 keyDecodingFormat,
@@ -152,7 +154,8 @@ public class UpsertKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
                 Collections.emptyMap(),
                 0,
                 true,
-                context.getObjectIdentifier().asSummaryString());
+                context.getObjectIdentifier().asSummaryString(),
+                metricOption);
     }
 
     @Override
@@ -188,6 +191,15 @@ public class UpsertKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
         SinkBufferFlushMode flushMode =
                 new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
 
+        String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
         // use {@link 
org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
         // it will use hash partition if key is set else in round-robin 
behaviour.
         return new KafkaDynamicSink(
@@ -205,7 +217,8 @@ public class UpsertKafkaDynamicTableFactory implements 
DynamicTableSourceFactory
                 true,
                 flushMode,
                 parallelism,
-                tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+                tableOptions.get(TRANSACTIONAL_ID_PREFIX),
+                metricOption);
     }
 
     private Tuple2<int[], int[]> 
createKeyValueProjections(ResolvedCatalogTable catalogTable) {
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index c980b63fbd..3546604c06 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -815,9 +815,16 @@
  Source  : flink-connector-hbase-2.2 1.15.4 (Please note that the software 
have been modified.)
  License : https://github.com/apache/flink/blob/master/LICENSE
 
-1.3.20 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
-       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+1.3.20 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaRecordSerializationSchema.java
        
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaConnectorOptionsUtil.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSink.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaOptions.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertSink.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/ReducingUpsertWriter.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
   Source  : org.apache.flink:flink-connector-kafka:1.15.4 (Please note that 
the software have been modified.)
   License : https://github.com/apache/flink/blob/master/LICENSE
 

Reply via email to