This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 704e3c0eb [INLONG-7503][Sort] Support multiple audit ids and introduce 
timestamp collector  (#7552)
704e3c0eb is described below

commit 704e3c0eb382c293468fd47e56dbf8c85713690b
Author: Schnapps <[email protected]>
AuthorDate: Fri Mar 10 10:34:24 2023 +0800

    [INLONG-7503][Sort] Support multiple audit ids and introduce timestamp 
collector  (#7552)
---
 inlong-sort/sort-common/pom.xml                    |  6 +++
 .../org/apache/inlong/sort/util/AuditUtils.java    | 54 +++++++++++++++++++++
 inlong-sort/sort-connectors/base/pom.xml           |  7 +++
 .../org/apache/inlong/sort/base/Constants.java     | 17 ++++---
 .../inlong/sort/base/metric/MetricOption.java      | 51 +++++++++++++-------
 .../inlong/sort/base/metric/SourceMetricData.java  | 55 +++++++++++++++++-----
 .../sort/cdc/base/source/IncrementalSource.java    |  2 +-
 .../table/DorisDynamicSchemaOutputFormat.java      |  2 +-
 .../sort/doris/table/DorisDynamicTableFactory.java |  2 +
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  2 +-
 .../sort/filesystem/FileSystemTableFactory.java    |  2 +
 .../filesystem/stream/AbstractStreamingWriter.java |  2 +-
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  |  2 +-
 .../hive/filesystem/AbstractStreamingWriter.java   |  2 +-
 .../sort/hive/table/HiveTableInlongFactory.java    |  2 +
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  2 +
 .../sink/multiple/DynamicSchemaHandleOperator.java |  2 +-
 .../sink/multiple/IcebergMultipleStreamWriter.java |  2 +-
 .../sink/multiple/IcebergSingleStreamWriter.java   |  2 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java    |  2 +-
 .../internal/JdbcMultiBatchingOutputFormat.java    |  2 +-
 .../sort/jdbc/table/JdbcDynamicTableFactory.java   |  2 +
 .../inlong/sort/kafka/FlinkKafkaConsumerBase.java  |  2 +-
 .../inlong/sort/kafka/FlinkKafkaProducer.java      |  2 +-
 .../sort/kafka/table/KafkaDynamicTableFactory.java |  2 +
 .../sort/kudu/sink/AbstractKuduSinkFunction.java   |  2 +-
 .../sort/kudu/table/KuduDynamicTableFactory.java   |  2 +
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   |  2 +-
 .../mongodb/table/MongoDBTableSourceFactory.java   |  2 +
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |  2 +-
 .../inlong/sort/cdc/mysql/source/MySqlSource.java  |  2 +-
 .../mysql/table/MySqlTableInlongSourceFactory.java |  2 +
 .../sort/cdc/base/source/IncrementalSource.java    |  2 +-
 .../oracle/debezium/DebeziumSourceFunction.java    |  2 +-
 .../cdc/oracle/table/OracleTableSourceFactory.java |  2 +
 .../sort/cdc/postgres/DebeziumSourceFunction.java  |  2 +-
 .../cdc/postgres/table/PostgreSQLTableFactory.java |  2 +
 inlong-sort/sort-connectors/pulsar/pom.xml         |  6 +++
 .../inlong/sort/pulsar/FlinkPulsarSource.java      |  9 +++-
 .../table/DynamicPulsarDeserializationSchema.java  | 16 ++-----
 .../pulsar/table/PulsarDynamicTableFactory.java    | 12 +++--
 .../pulsar/table/PulsarDynamicTableSource.java     | 14 ++++--
 .../table/UpsertPulsarDynamicTableFactory.java     |  4 +-
 .../pulsar/withoutadmin/FlinkPulsarSource.java     |  9 +++-
 .../sort/pulsar/withoutadmin/MetricsCollector.java | 55 ++++++++++++++++++++++
 .../sort/redis/sink/AbstractRedisSinkFunction.java |  2 +-
 .../sort/redis/table/RedisDynamicTableFactory.java |  2 +
 .../sqlserver/table/DebeziumSourceFunction.java    |  2 +-
 .../cdc/sqlserver/table/SqlServerTableFactory.java |  2 +
 .../table/sink/StarRocksDynamicSinkFunction.java   |  2 +-
 .../base/collectors/TimestampedCollector.java      | 30 ++++++++++++
 .../inlongmsg/InLongMsgDeserializationSchema.java  | 14 ++++--
 52 files changed, 348 insertions(+), 83 deletions(-)

diff --git a/inlong-sort/sort-common/pom.xml b/inlong-sort/sort-common/pom.xml
index bfb3f20fa..f89c02447 100644
--- a/inlong-sort/sort-common/pom.xml
+++ b/inlong-sort/sort-common/pom.xml
@@ -60,6 +60,12 @@
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-jackson</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>com.google.code.findbugs</groupId>
             <artifactId>jsr305</artifactId>
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
new file mode 100644
index 000000000..30e8c3f8f
--- /dev/null
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.util;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class AuditUtils {
+
+    public static final String DELIMITER = "&";
+
+    private static final String IP_OR_HOST_PORT = 
"^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
+            + "2}|[1-9]\\d{"
+            + "3}|[1-5]\\d{"
+            + "4}|6[0-4]\\d{"
+            + "3}|65[0-4]\\d{"
+            + "2}|655[0-2]\\d|6553[0-5])$";
+
+    public static HashSet<String> extractAuditIpPorts(String inlongAudit) {
+        HashSet<String> ipPortList = new HashSet<>();
+        String[] ipPorts = inlongAudit.split(DELIMITER);
+        for (String ipPort : ipPorts) {
+            Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, 
ipPort),
+                    "Error inLong audit format: " + inlongAudit);
+            ipPortList.add(ipPort);
+        }
+        return ipPortList;
+    }
+
+    public static List<Integer> extractAuditKeys(String auditKeys) {
+        return Arrays.stream(auditKeys.split(DELIMITER)).map(Integer::valueOf)
+                .collect(Collectors.toList());
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/base/pom.xml 
b/inlong-sort/sort-connectors/base/pom.xml
index 23a8f597c..3942a5887 100644
--- a/inlong-sort/sort-connectors/base/pom.xml
+++ b/inlong-sort/sort-connectors/base/pom.xml
@@ -48,6 +48,13 @@
             <artifactId>aws-java-sdk-s3</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 7af393fe7..ac2a7c3a9 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -76,6 +76,11 @@ public final class Constants {
      * Node id used in inlong metric
      */
     public static final String NODE_ID = "nodeId";
+    // sort received successfully
+    public static final String AUDIT_SORT_INPUT = "7";
+
+    // sort send successfully
+    public static final Integer AUDIT_SORT_OUTPUT = 8;
     /**
      * Database Name used in inlong metric
      */
@@ -117,12 +122,6 @@ public final class Constants {
      */
     public static final String KEY_VALUE_DELIMITER = "=";
 
-    // sort received successfully
-    public static final Integer AUDIT_SORT_INPUT = 7;
-
-    // sort send successfully
-    public static final Integer AUDIT_SORT_OUTPUT = 8;
-
     public static final String INLONG_METRIC_STATE_NAME = 
"inlong-metric-states";
 
     /**
@@ -149,6 +148,12 @@ public final class Constants {
                     .withDescription("Audit proxy host address for reporting 
audit metrics. \n"
                             + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
+    public static final ConfigOption<String> AUDIT_KEYS =
+            ConfigOptions.key("metrics.audit.key")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("Audit keys for metrics collecting");
+
     public static final ConfigOption<Boolean> IGNORE_ALL_CHANGELOG =
             ConfigOptions.key("sink.ignore.changelog")
                     .booleanType()
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index 91e9f9f89..cd6a6b5b7 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.base.metric;
 
+import java.util.List;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -26,25 +27,24 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.regex.Pattern;
 import java.util.stream.Stream;
+import org.apache.inlong.sort.util.AuditUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import static org.apache.inlong.sort.base.Constants.AUDIT_SORT_INPUT;
 import static org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.GROUP_ID;
 import static org.apache.inlong.sort.base.Constants.STREAM_ID;
 
 public class MetricOption implements Serializable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetricOption.class);
+
     private static final long serialVersionUID = 1L;
-    private static final String IP_OR_HOST_PORT = 
"^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
-            + "2}|[1-9]\\d{"
-            + "3}|[1-5]\\d{"
-            + "4}|6[0-4]\\d{"
-            + "3}|65[0-4]\\d{"
-            + "2}|655[0-2]\\d|6553[0-5])$";
 
     private Map<String, String> labels;
-    private final HashSet<String> ipPortList;
+    private HashSet<String> ipPortList;
     private String ipPorts;
     private RegisteredMetric registeredMetric;
     private long initRecords;
@@ -52,6 +52,7 @@ public class MetricOption implements Serializable {
     private long initDirtyRecords;
     private long initDirtyBytes;
     private long readPhase;
+    private List<Integer> inlongAuditKeys;
 
     private MetricOption(
             String inlongLabels,
@@ -61,7 +62,8 @@ public class MetricOption implements Serializable {
             long initBytes,
             Long initDirtyRecords,
             Long initDirtyBytes,
-            Long readPhase) {
+            Long readPhase,
+            String inlongAuditKeys) {
         
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
                 "Inlong labels must be set for register metric.");
 
@@ -80,17 +82,22 @@ public class MetricOption implements Serializable {
             labels.put(key, value);
         });
 
-        this.ipPortList = new HashSet<>();
         this.ipPorts = inlongAudit;
+
         if (ipPorts != null) {
+
             Preconditions.checkArgument(labels.containsKey(GROUP_ID) && 
labels.containsKey(STREAM_ID),
                     "groupId and streamId must be set when enable inlong audit 
collect.");
-            String[] ipPortStrs = inlongAudit.split(DELIMITER);
-            for (String ipPort : ipPortStrs) {
-                Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, 
ipPort),
-                        "Error inLong audit format: " + inlongAudit);
-                this.ipPortList.add(ipPort);
+
+            if (inlongAuditKeys == null) {
+                LOG.warn("should set inlongAuditKeys when enable inlong audit 
collect, "
+                        + "fallback to use id {} as audit key", 
AUDIT_SORT_INPUT);
+                inlongAuditKeys = AUDIT_SORT_INPUT;
             }
+
+            this.inlongAuditKeys = 
AuditUtils.extractAuditKeys(inlongAuditKeys);
+            this.ipPortList = AuditUtils.extractAuditIpPorts(ipPorts);
+
         }
 
         if (registeredMetric != null) {
@@ -138,6 +145,10 @@ public class MetricOption implements Serializable {
         this.initDirtyRecords = initDirtyRecords;
     }
 
+    public List<Integer> getInlongAuditKeys() {
+        return inlongAuditKeys;
+    }
+
     public long getInitDirtyBytes() {
         return initDirtyBytes;
     }
@@ -168,6 +179,7 @@ public class MetricOption implements Serializable {
 
         private String inlongLabels;
         private String inlongAudit;
+        private String inlongAuditKeys;
         private RegisteredMetric registeredMetric = RegisteredMetric.ALL;
         private long initRecords = 0L;
         private long initBytes = 0L;
@@ -183,11 +195,16 @@ public class MetricOption implements Serializable {
             return this;
         }
 
-        public MetricOption.Builder withInlongAudit(String inlongAudit) {
+        public MetricOption.Builder withAuditAddress(String inlongAudit) {
             this.inlongAudit = inlongAudit;
             return this;
         }
 
+        public MetricOption.Builder withAuditKeys(String inlongAuditIds) {
+            this.inlongAuditKeys = inlongAuditIds;
+            return this;
+        }
+
         public MetricOption.Builder withRegisterMetric(RegisteredMetric 
registeredMetric) {
             this.registeredMetric = registeredMetric;
             return this;
@@ -223,7 +240,7 @@ public class MetricOption implements Serializable {
                 return null;
             }
             return new MetricOption(inlongLabels, inlongAudit, 
registeredMetric, initRecords, initBytes,
-                    initDirtyRecords, initDirtyBytes, initReadPhase);
+                    initDirtyRecords, initDirtyBytes, initReadPhase, 
inlongAuditKeys);
         }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index e365103fe..e5ffdf844 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -17,15 +17,17 @@
 
 package org.apache.inlong.sort.base.metric;
 
+import java.util.List;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.inlong.audit.AuditOperator;
-import org.apache.inlong.sort.base.Constants;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER;
@@ -39,6 +41,7 @@ import static 
org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
  */
 public class SourceMetricData implements MetricData {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SourceMetricData.class);
     private final MetricGroup metricGroup;
     private final Map<String, String> labels;
     private Counter numRecordsIn;
@@ -48,6 +51,7 @@ public class SourceMetricData implements MetricData {
     private Meter numRecordsInPerSecond;
     private Meter numBytesInPerSecond;
     private AuditOperator auditOperator;
+    private List<Integer> auditKeys;
 
     public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
@@ -71,6 +75,7 @@ public class SourceMetricData implements MetricData {
         if (option.getIpPorts().isPresent()) {
             AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
             this.auditOperator = AuditOperator.getInstance();
+            this.auditKeys = option.getInlongAuditKeys();
         }
     }
 
@@ -193,7 +198,45 @@ public class SourceMetricData implements MetricData {
         outputMetrics(1, size);
     }
 
+    public void outputMetricsWithEstimate(Object data, long dataTime) {
+        long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
+        outputMetrics(1, size, dataTime);
+    }
+
     public void outputMetrics(long rowCountSize, long rowDataSize) {
+        outputDefaultMetrics(rowCountSize, rowDataSize);
+
+        if (auditOperator != null) {
+            for (Integer key : auditKeys) {
+                auditOperator.add(
+                        key,
+                        getGroupId(),
+                        getStreamId(),
+                        System.currentTimeMillis(),
+                        rowCountSize,
+                        rowDataSize);
+            }
+
+        }
+    }
+
+    public void outputMetrics(long rowCountSize, long rowDataSize, long 
dataTime) {
+        outputDefaultMetrics(rowCountSize, rowDataSize);
+
+        if (auditOperator != null) {
+            for (Integer key : auditKeys) {
+                auditOperator.add(
+                        key,
+                        getGroupId(),
+                        getStreamId(),
+                        dataTime,
+                        rowCountSize,
+                        rowDataSize);
+            }
+        }
+    }
+
+    private void outputDefaultMetrics(long rowCountSize, long rowDataSize) {
         if (numRecordsIn != null) {
             this.numRecordsIn.inc(rowCountSize);
         }
@@ -209,16 +252,6 @@ public class SourceMetricData implements MetricData {
         if (numBytesInForMeter != null) {
             this.numBytesInForMeter.inc(rowDataSize);
         }
-
-        if (auditOperator != null) {
-            auditOperator.add(
-                    Constants.AUDIT_SORT_INPUT,
-                    getGroupId(),
-                    getStreamId(),
-                    System.currentTimeMillis(),
-                    rowCountSize,
-                    rowDataSize);
-        }
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
index d74e4ea72..de3bec22e 100644
--- 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -122,7 +122,7 @@ public class IncrementalSource<T, C extends SourceConfig>
         // create source config for the given subtask (e.g. unique server id)
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(jdbcSourceConfig.getInlongMetric())
-                .withInlongAudit(jdbcSourceConfig.getInlongAudit())
+                .withAuditAddress(jdbcSourceConfig.getInlongAudit())
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
 
diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index ebc8cfca2..2ee979d15 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -253,7 +253,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         }
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index a47fd071b..e5e482c29 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -55,6 +55,7 @@ import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUER
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -217,6 +218,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
         options.add(FactoryUtil.SINK_PARALLELISM);
+        options.add(AUDIT_KEYS);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 3de33e5cf..e26ebdb10 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -301,7 +301,7 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
 
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
index b85f9ac6d..637876abf 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static java.time.ZoneId.SHORT_IDS;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
@@ -133,6 +134,7 @@ public class FileSystemTableFactory implements 
DynamicTableSourceFactory, Dynami
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
         options.add(IGNORE_ALL_CHANGELOG);
+        options.add(AUDIT_KEYS);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 1e7195e07..3d8531bd4 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -136,7 +136,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
         super.open();
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(inlongAudit)
+                .withAuditAddress(inlongAudit)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 76df0cc9e..1d62d8f56 100644
--- 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -155,7 +155,7 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
             this.runtimeContext = getRuntimeContext();
             MetricOption metricOption = MetricOption.builder()
                     .withInlongLabels(inlongMetric)
-                    .withInlongAudit(inlongAudit)
+                    .withAuditAddress(inlongAudit)
                     .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                     .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                     .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index 4a0298384..4a359ad7e 100644
--- 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -155,7 +155,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
         super.open();
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
index 054fb4ab4..aa33d3b31 100644
--- 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
+++ 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/table/HiveTableInlongFactory.java
@@ -51,6 +51,7 @@ import static 
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOp
 import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_ENABLE;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.hive.HiveOptions.HIVE_DATABASE;
@@ -91,6 +92,7 @@ public class HiveTableInlongFactory implements 
DynamicTableSourceFactory, Dynami
         options.add(HADOOP_CONF_DIR);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index e872d248b..c6d3ed874 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -50,6 +50,7 @@ import 
org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -264,6 +265,7 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
         options.add(IGNORE_ALL_CHANGELOG);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
 
         options.add(SINK_MULTIPLE_ENABLE);
         options.add(SINK_MULTIPLE_FORMAT);
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index b95278223..12914098e 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -141,7 +141,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         // Initialize metric
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 7d2973a3d..b6c5f9851 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -136,7 +136,7 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
         this.runtimeContext = getRuntimeContext();
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index 502c47008..d44927db1 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -113,7 +113,7 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
         if (!multipleSink) {
             MetricOption metricOption = MetricOption.builder()
                     .withInlongLabels(inlongMetric)
-                    .withInlongAudit(auditHostAndPorts)
+                    .withAuditAddress(auditHostAndPorts)
                     .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                     .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                     .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index ec3331b4a..07ea97e70 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -155,7 +155,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec 
extends JdbcBatchStat
         this.runtimeContext = getRuntimeContext();
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index bf6964927..98f044c55 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -179,7 +179,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
         this.runtimeContext = getRuntimeContext();
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
index e12128ec8..39a8daaa1 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicTableFactory.java
@@ -46,6 +46,7 @@ import 
org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.util.JdbcUrlUtils;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
@@ -393,6 +394,7 @@ public class JdbcDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         optionalOptions.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
         optionalOptions.add(INLONG_METRIC);
         optionalOptions.add(INLONG_AUDIT);
+        optionalOptions.add(AUDIT_KEYS);
         return optionalOptions;
     }
 
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
index b41d75762..b3aa4dbbb 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaConsumerBase.java
@@ -829,7 +829,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
     public void run(SourceContext<T> sourceContext) throws Exception {
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(inlongAudit)
+                .withAuditAddress(inlongAudit)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
index 93a84c733..935a54afa 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java
@@ -934,7 +934,7 @@ public class FlinkKafkaProducer<IN>
         }
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index d92d5cfed..f2d48e95a 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -98,6 +98,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -309,6 +310,7 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         options.add(KAFKA_IGNORE_ALL_CHANGELOG);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
         options.add(SINK_MULTIPLE_FORMAT);
         options.add(SINK_MULTIPLE_PARTITION_PATTERN);
         return options;
diff --git 
a/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java
 
b/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java
index b41914073..6acb9cd32 100644
--- 
a/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/sink/AbstractKuduSinkFunction.java
@@ -115,7 +115,7 @@ public abstract class AbstractKuduSinkFunction
         this.running = true;
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inLongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableFactory.java
index a33c4183b..642b9b617 100644
--- 
a/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kudu/src/main/java/org/apache/inlong/sort/kudu/table/KuduDynamicTableFactory.java
@@ -39,6 +39,7 @@ import java.util.Optional;
 import java.util.Set;
 
 import static 
org.apache.flink.shaded.guava18.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.kudu.common.KuduOptions.CONNECTOR_MASTERS;
@@ -177,6 +178,7 @@ public class KuduDynamicTableFactory
 
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
         return options;
     }
 }
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 83bc75e49..1a763a16c 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -450,7 +450,7 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(inlongAudit)
+                .withAuditAddress(inlongAudit)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
index 7fe19e27f..9e15493bb 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/table/MongoDBTableSourceFactory.java
@@ -43,6 +43,7 @@ import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOp
 import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
 import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.base.Constants.SOURCE_MULTIPLE_ENABLE;
@@ -166,6 +167,7 @@ public class MongoDBTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(SOURCE_MULTIPLE_ENABLE);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
         return options;
     }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index d99da8389..43ee4f53b 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -450,7 +450,7 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(inlongAudit)
+                .withAuditAddress(inlongAudit)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 1aef8af2e..ea79e14b8 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -144,7 +144,7 @@ public class MySqlSource<T>
                 configFactory.createConfig(readerContext.getIndexOfSubtask());
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(sourceConfig.getInlongMetric())
-                .withInlongAudit(sourceConfig.getInlongAudit())
+                .withAuditAddress(sourceConfig.getInlongAudit())
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         sourceReaderMetrics.registerMetrics(metricOption);
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 35ee07e63..6aaf06ad7 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.cdc.base.debezium.table.DebeziumOptions.getDebeziumProperties;
@@ -235,6 +236,7 @@ public class MySqlTableInlongSourceFactory implements 
DynamicTableSourceFactory
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
         options.add(ROW_KINDS_FILTERED);
+        options.add(AUDIT_KEYS);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
index 48e711eb9..1e6cd1245 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -124,7 +124,7 @@ public class IncrementalSource<T, C extends SourceConfig>
         // create source config for the given subtask (e.g. unique server id)
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(jdbcSourceConfig.getInlongMetric())
-                .withInlongAudit(jdbcSourceConfig.getInlongAudit())
+                .withAuditAddress(jdbcSourceConfig.getInlongAudit())
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
 
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
index 0416424a5..7ee86f214 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
@@ -452,7 +452,7 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(inlongAudit)
+                .withAuditAddress(inlongAudit)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
index a91976b7c..99aed8f12 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSourceFactory.java
@@ -53,6 +53,7 @@ import static 
com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
 import static 
com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions.PORT;
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.base.Constants.SOURCE_MULTIPLE_ENABLE;
@@ -166,6 +167,7 @@ public class OracleTableSourceFactory implements 
DynamicTableSourceFactory {
         options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
         options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
         options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
+        options.add(AUDIT_KEYS);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
index ef3f744de..b1a7a5851 100644
--- 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
@@ -452,7 +452,7 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(inlongAudit)
+                .withAuditAddress(inlongAudit)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
index af134c935..3777f9f6e 100644
--- 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
+++ 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java
@@ -30,6 +30,7 @@ import java.util.Set;
 
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.base.Constants.SOURCE_MULTIPLE_ENABLE;
@@ -212,6 +213,7 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
         options.add(SOURCE_MULTIPLE_ENABLE);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
         options.add(APPEND_MODE);
         options.add(ROW_KINDS_FILTERED);
         return options;
diff --git a/inlong-sort/sort-connectors/pulsar/pom.xml 
b/inlong-sort/sort-connectors/pulsar/pom.xml
index c7b493def..5d5e3da54 100644
--- a/inlong-sort/sort-connectors/pulsar/pom.xml
+++ b/inlong-sort/sort-connectors/pulsar/pom.xml
@@ -62,6 +62,12 @@
             <artifactId>audit-sdk</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-format-base</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
index 9fb38d238..9c83bbc58 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/FlinkPulsarSource.java
@@ -244,13 +244,16 @@ public class FlinkPulsarSource<T>
 
     protected String auditHostAndPorts;
 
+    protected String auditKeys;
+
     public FlinkPulsarSource(
             String adminUrl,
             ClientConfigurationData clientConf,
             PulsarDeserializationSchema<T> deserializer,
             Properties properties,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            String auditKeys) {
         this.adminUrl = checkNotNull(adminUrl);
         this.clientConfigurationData = checkNotNull(clientConf);
         this.deserializer = deserializer;
@@ -276,6 +279,7 @@ public class FlinkPulsarSource<T>
         this.oldStateVersion = 
SourceSinkUtils.getOldStateVersion(caseInsensitiveParams, oldStateVersion);
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = inlongAudit;
+        this.auditKeys = auditKeys;
     }
 
     // ------------------------------------------------------------------------
@@ -446,7 +450,8 @@ public class FlinkPulsarSource<T>
 
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index 60212768b..f3c71e029 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -30,7 +30,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
-import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
+import org.apache.inlong.sort.pulsar.withoutadmin.MetricsCollector;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 
@@ -120,12 +120,8 @@ public class DynamicPulsarDeserializationSchema implements 
PulsarDeserialization
         // shortcut in case no output projection is required,
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
-            valueDeserialization.deserialize(message.getData(), new 
CallbackCollector<>(inputRow -> {
-                if (sourceMetricData != null) {
-                    sourceMetricData.outputMetricsWithEstimate(inputRow);
-                }
-                collector.collect(inputRow);
-            }));
+            valueDeserialization.deserialize(message.getData(),
+                    new MetricsCollector<>(collector, sourceMetricData));
             return;
         }
         BufferingCollector keyCollector = new BufferingCollector();
@@ -143,10 +139,8 @@ public class DynamicPulsarDeserializationSchema implements 
PulsarDeserialization
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(message.getData(), new 
CallbackCollector<>(inputRow -> {
-                sourceMetricData.outputMetricsWithEstimate(inputRow);
-                outputCollector.collect(inputRow);
-            }));
+            valueDeserialization.deserialize(message.getData(), new 
MetricsCollector<>(
+                    outputCollector, sourceMetricData));
         }
 
         keyCollector.buffer.clear();
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
index bd5be42b2..c6d6a19e4 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableFactory.java
@@ -76,6 +76,7 @@ import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.validateTableSourceOptions;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
@@ -275,6 +276,8 @@ public class PulsarDynamicTableFactory
 
         String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
 
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
+
         return createPulsarTableSource(
                 physicalDataType,
                 keyDecodingFormat.orElse(null),
@@ -289,7 +292,7 @@ public class PulsarDynamicTableFactory
                 properties,
                 startupOptions,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts, auditKeys);
     }
 
     @Override
@@ -329,6 +332,7 @@ public class PulsarDynamicTableFactory
         options.add(PROPERTIES);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
 
         return options;
     }
@@ -361,7 +365,8 @@ public class PulsarDynamicTableFactory
             Properties properties,
             PulsarTableOptions.StartupOptions startupOptions,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            String auditKeys) {
         return new PulsarDynamicTableSource(
                 physicalDataType,
                 keyDecodingFormat,
@@ -377,6 +382,7 @@ public class PulsarDynamicTableFactory
                 startupOptions,
                 false,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                auditKeys);
     }
 }
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
index a0eeb961d..e9465fcb2 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarDynamicTableSource.java
@@ -141,6 +141,7 @@ public class PulsarDynamicTableSource implements 
ScanTableSource, SupportsReadin
 
     protected String inlongMetric;
 
+    protected String auditKeys;
     protected String auditHostAndPorts;
 
     public PulsarDynamicTableSource(
@@ -158,7 +159,8 @@ public class PulsarDynamicTableSource implements 
ScanTableSource, SupportsReadin
             PulsarTableOptions.StartupOptions startupOptions,
             boolean upsertMode,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            String auditKeys) {
         this.producedDataType = physicalDataType;
         setTopicInfo(properties, topics, topicPattern);
 
@@ -187,6 +189,7 @@ public class PulsarDynamicTableSource implements 
ScanTableSource, SupportsReadin
         this.upsertMode = upsertMode;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
     }
 
     private void setTopicInfo(Properties properties, List<String> topics, 
String topicPattern) {
@@ -277,7 +280,8 @@ public class PulsarDynamicTableSource implements 
ScanTableSource, SupportsReadin
                         deserializationSchema,
                         properties,
                         inlongMetric,
-                        auditHostAndPorts);
+                        auditHostAndPorts,
+                        auditKeys);
 
         if (watermarkStrategy != null) {
             source.assignTimestampsAndWatermarks(watermarkStrategy);
@@ -312,7 +316,8 @@ public class PulsarDynamicTableSource implements 
ScanTableSource, SupportsReadin
                 deserializationSchema,
                 properties,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                auditKeys);
 
         if (watermarkStrategy != null) {
             source.assignTimestampsAndWatermarks(watermarkStrategy);
@@ -349,7 +354,8 @@ public class PulsarDynamicTableSource implements 
ScanTableSource, SupportsReadin
                 startupOptions,
                 false,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                auditKeys);
         copy.producedDataType = producedDataType;
         copy.metadataKeys = metadataKeys;
         copy.watermarkStrategy = watermarkStrategy;
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
index ef68c44ed..502a10a62 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/UpsertPulsarDynamicTableFactory.java
@@ -66,6 +66,7 @@ import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOpti
 import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.createValueFormatProjection;
 import static 
org.apache.flink.streaming.connectors.pulsar.table.PulsarTableOptions.getPulsarProperties;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
@@ -188,6 +189,7 @@ public class UpsertPulsarDynamicTableFactory implements 
DynamicTableSourceFactor
         String topicPattern = tableOptions.get(TOPIC_PATTERN);
         String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
+        String auditKeys = tableOptions.get(AUDIT_KEYS);
 
         return new PulsarDynamicTableSource(
                 schema.toPhysicalRowDataType(),
@@ -204,7 +206,7 @@ public class UpsertPulsarDynamicTableFactory implements 
DynamicTableSourceFactor
                 startupOptions,
                 true,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts, auditKeys);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
index 8150fb894..2a84ec041 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/FlinkPulsarSource.java
@@ -243,6 +243,8 @@ public class FlinkPulsarSource<T>
      */
     private String inlongAudit;
 
+    private String inlongAuditKeys;
+
     private SourceMetricData sourceMetricData;
 
     private transient ListState<MetricState> metricStateListState;
@@ -253,7 +255,8 @@ public class FlinkPulsarSource<T>
             PulsarDeserializationSchema<T> deserializer,
             Properties properties,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            String inlongAuditKeys) {
         this.inlongAudit = inlongAudit;
         this.inlongMetric = inlongMetric;
         this.serverUrl = checkNotNull(serverUrl);
@@ -272,6 +275,7 @@ public class FlinkPulsarSource<T>
                 SourceSinkUtils.getCommitMaxRetries(caseInsensitiveParams);
         this.useMetrics =
                 SourceSinkUtils.getUseMetrics(caseInsensitiveParams);
+        this.inlongAuditKeys = inlongAuditKeys;
 
         
CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(caseInsensitiveParams));
 
@@ -431,7 +435,8 @@ public class FlinkPulsarSource<T>
 
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(inlongAudit)
+                .withAuditAddress(inlongAudit)
+                .withAuditKeys(inlongAuditKeys)
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
new file mode 100644
index 000000000..947337dbe
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/withoutadmin/MetricsCollector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.pulsar.withoutadmin;
+
+import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
+
+/**
+ * sending metrics each time a record is collected.
+ * @param <T>
+ */
+public class MetricsCollector<T> implements TimestampedCollector<T> {
+
+    private Collector<T> collector;
+
+    private long timestampMillis;
+
+    SourceMetricData metricData;
+
+    public MetricsCollector(Collector<T> collector,
+            SourceMetricData sourceMetricData) {
+        this.metricData = sourceMetricData;
+        this.collector = collector;
+    }
+
+    public void resetTimestamp(long timestampMillis) {
+        this.timestampMillis = timestampMillis;
+    }
+
+    @Override
+    public void collect(T record) {
+        metricData.outputMetricsWithEstimate(record, timestampMillis);
+        collector.collect(record);
+    }
+
+    @Override
+    public void close() {
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
index 97ae9b9f2..5f0c61eb4 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
@@ -171,7 +171,7 @@ public abstract class AbstractRedisSinkFunction<OUT>
         }
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inLongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
index 8edf979e9..0f2a56e0f 100644
--- 
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.redis.table;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.redis.common.config.RedisOptions.DATA_TYPE;
@@ -210,6 +211,7 @@ public class RedisDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         options.add(RedisOptions.SENTINELS_INFO);
         options.add(RedisOptions.SOCKET_TIMEOUT);
         options.add(RedisOptions.TIMEOUT);
+        options.add(AUDIT_KEYS);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
index 07e026c5d..c207f5775 100644
--- 
a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/DebeziumSourceFunction.java
@@ -429,7 +429,7 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
                 .withRegisterMetric(RegisteredMetric.ALL)
diff --git 
a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
 
b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
index b0454811e..03747d0f6 100644
--- 
a/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
+++ 
b/inlong-sort/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/cdc/sqlserver/table/SqlServerTableFactory.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX;
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
 import static 
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
@@ -161,6 +162,7 @@ public class SqlServerTableFactory implements 
DynamicTableSourceFactory {
         options.add(SCAN_STARTUP_MODE);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
+        options.add(AUDIT_KEYS);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 3b70c35bb..08335e0d3 100644
--- 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -158,7 +158,7 @@ public class StarRocksDynamicSinkFunction<T> extends 
RichSinkFunction<T> impleme
         sinkManager.startAsyncFlushing();
 
         MetricOption metricOption = 
MetricOption.builder().withInlongLabels(inlongMetric)
-                .withInlongAudit(auditHostAndPorts)
+                .withAuditAddress(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
diff --git 
a/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimestampedCollector.java
 
b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimestampedCollector.java
new file mode 100644
index 000000000..fa8ad8b09
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimestampedCollector.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.base.collectors;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Collector that support timestamp collection.
+ * @param <T>
+ */
+public interface TimestampedCollector<T> extends Collector<T> {
+
+    void resetTimestamp(long timestampMillis);
+
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
index 6b22837c9..9789488ba 100644
--- 
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDeserializationSchema.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Collector;
 import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.collectors.TimestampedCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +71,7 @@ public class InLongMsgDeserializationSchema implements 
DeserializationSchema<Row
     }
 
     @Override
-    public RowData deserialize(byte[] bytes) throws IOException {
+    public RowData deserialize(byte[] bytes) {
         throw new RuntimeException(
                 "Please invoke DeserializationSchema#deserialize(byte[], 
Collector<RowData>) instead.");
     }
@@ -106,10 +107,14 @@ public class InLongMsgDeserializationSchema implements 
DeserializationSchema<Row
                     continue;
                 }
 
+                if (out instanceof TimestampedCollector) {
+                    ((TimestampedCollector<RowData>) 
out).resetTimestamp(head.getTime().getTime());
+                }
+
                 List<RowData> list = new ArrayList<>();
                 ListCollector<RowData> collector = new ListCollector<>(list);
                 deserializationSchema.deserialize(bodyBytes, collector);
-                list.stream().forEach(rowdata -> emitRow(head, 
(GenericRowData) rowdata, out));
+                list.forEach(rowdata -> emitRow(head, (GenericRowData) 
rowdata, out));
             }
         }
 
@@ -153,12 +158,15 @@ public class InLongMsgDeserializationSchema implements 
DeserializationSchema<Row
 
     /** add metadata column */
     private void emitRow(InLongMsgHead head, GenericRowData physicalRow, 
Collector<RowData> out) {
+
         if (metadataConverters.length == 0) {
             out.collect(physicalRow);
             return;
         }
-        final int physicalArity = physicalRow.getArity();
+
         final int metadataArity = metadataConverters.length;
+        final int physicalArity = physicalRow.getArity();
+
         final GenericRowData producedRow =
                 new GenericRowData(physicalRow.getRowKind(), physicalArity + 
metadataArity);
         for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {

Reply via email to