This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a8a05510ed [INLONG-11829][Sort] Optimize MySQL-CDC changelog audit 
report (#11835)
a8a05510ed is described below

commit a8a05510ed45e60b0a1ee9e9edb89af4df34b4ec
Author: vernedeng <[email protected]>
AuthorDate: Thu Apr 17 14:56:08 2025 +0800

    [INLONG-11829][Sort] Optimize MySQL-CDC changelog audit report (#11835)
    
    * [INLONG-11829][Sort] Optimize MySQL-CDC changelog audit report
---
 .../org/apache/inlong/sort/util/AuditUtils.java    | 11 +++++++++
 .../inlong/sort/base/metric/CdcExactlyMetric.java  | 26 ++++------------------
 .../inlong/sort/base/metric/MetricOption.java      | 20 ++++++++++++++++-
 .../inlong/sort/mysql/MysqlTableFactory.java       |  9 ++++++++
 .../mysql/RowDataDebeziumDeserializeSchema.java    | 13 ++++-------
 5 files changed, 47 insertions(+), 32 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
index daf2c8a58b..7c647c37c5 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
@@ -18,10 +18,13 @@
 package org.apache.inlong.sort.util;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import org.apache.flink.types.RowKind;
 
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -52,4 +55,12 @@ public class AuditUtils {
                 .collect(Collectors.toList());
     }
 
+    public static Map<RowKind, Integer> extractChangelogAuditKeyMap(String 
changelogAuditKeys) {
+        return 
Splitter.on("&").withKeyValueSeparator("=").split(changelogAuditKeys)
+                .entrySet()
+                .stream()
+                .collect(Collectors.toMap(entry -> 
RowKind.valueOf(entry.getKey()),
+                        entry -> Integer.parseInt(entry.getValue())));
+    }
+
 }
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
index c4c94e152b..83d9e4f7dc 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
@@ -20,13 +20,10 @@ package org.apache.inlong.sort.base.metric;
 import org.apache.inlong.audit.AuditReporterImpl;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
@@ -51,28 +48,13 @@ public class CdcExactlyMetric implements Serializable, 
SourceMetricsReporter {
         this.labels = option.getLabels();
         this.groupId = labels.get(GROUP_ID);
         this.streamId = labels.get(STREAM_ID);
-        this.auditKeyMap = new HashMap<>();
 
         if (option.getIpPorts().isPresent()) {
             auditReporter = new AuditReporterImpl();
             auditReporter.setAutoFlush(false);
             auditReporter.setAuditProxy(option.getIpPortSet());
-            List<Integer> auditKeys = option.getInlongAuditKeys();
-
-            if (CollectionUtils.isEmpty(auditKeys)) {
-                log.warn("inlong audit keys is empty");
-            } else if (auditKeys.size() == 1) {
-                auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
-                log.warn("only the insert audit key is set, the update and 
delete audit will be ignored");
-            } else if (auditKeys.size() == 4) {
-                auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
-                auditKeyMap.put(RowKind.UPDATE_BEFORE, auditKeys.get(1));
-                auditKeyMap.put(RowKind.UPDATE_AFTER, auditKeys.get(2));
-                auditKeyMap.put(RowKind.DELETE, auditKeys.get(3));
-            } else {
-                throw new IllegalArgumentException("audit key size must be 1 
or 4");
-            }
         }
+        auditKeyMap = option.getInlongChangelogAuditKeys();
         log.info("CdcExactlyMetric init, groupId: {}, streamId: {}, audit key: 
{}", groupId, streamId, auditKeyMap);
     }
 
@@ -82,15 +64,15 @@ public class CdcExactlyMetric implements Serializable, 
SourceMetricsReporter {
         if (data instanceof RowData) {
             RowData rowData = (RowData) data;
             RowKind rowKind = rowData.getRowKind();
-            int key = auditKeyMap.get(rowKind);
+            Integer key = auditKeyMap.get(rowKind);
             outputMetrics(1, size, dataTime, key);
         } else {
             outputMetrics(1, size, dataTime, auditKeyMap.get(RowKind.INSERT));
         }
     }
 
-    public void outputMetrics(long rowCountSize, long rowDataSize, long 
dataTime, int key) {
-        if (auditReporter != null) {
+    public void outputMetrics(long rowCountSize, long rowDataSize, long 
dataTime, Integer key) {
+        if (auditReporter != null && key != null) {
             auditReporter.add(
                     this.currentCheckpointId,
                     key,
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index eb01d9f165..bcba62b1c0 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.metric;
 
 import org.apache.inlong.sort.util.AuditUtils;
 
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
@@ -56,6 +57,7 @@ public class MetricOption implements Serializable {
     private long initDirtyBytes;
     private long readPhase;
     private List<Integer> inlongAuditKeys;
+    private Map<RowKind, Integer> inlongChangelogAuditKeys;
 
     private MetricOption(
             Map<String, String> labels,
@@ -67,6 +69,7 @@ public class MetricOption implements Serializable {
             Long initDirtyBytes,
             Long readPhase,
             List<Integer> inlongAuditKeys,
+            Map<RowKind, Integer> inlongChangelogAuditKeys,
             Set<String> ipPortSet) {
         this.initRecords = initRecords;
         this.initBytes = initBytes;
@@ -78,6 +81,7 @@ public class MetricOption implements Serializable {
         this.inlongAuditKeys = inlongAuditKeys;
         this.ipPortSet = ipPortSet;
         this.registeredMetric = registeredMetric;
+        this.inlongChangelogAuditKeys = inlongChangelogAuditKeys;
     }
 
     public Map<String, String> getLabels() {
@@ -124,6 +128,10 @@ public class MetricOption implements Serializable {
         return inlongAuditKeys;
     }
 
+    public Map<RowKind, Integer> getInlongChangelogAuditKeys() {
+        return inlongChangelogAuditKeys;
+    }
+
     public long getInitDirtyBytes() {
         return initDirtyBytes;
     }
@@ -155,6 +163,7 @@ public class MetricOption implements Serializable {
         private String inlongLabels;
         private String inlongAudit;
         private String inlongAuditKeys;
+        private String inlongChangelogAuditKeys;
         private RegisteredMetric registeredMetric = RegisteredMetric.ALL;
         private long initRecords = 0L;
         private long initBytes = 0L;
@@ -180,6 +189,11 @@ public class MetricOption implements Serializable {
             return this;
         }
 
+        public MetricOption.Builder withChangelogAuditKeys(String 
inlongChangelogAuditKeys) {
+            this.inlongChangelogAuditKeys = inlongChangelogAuditKeys;
+            return this;
+        }
+
         public MetricOption.Builder withRegisterMetric(RegisteredMetric 
registeredMetric) {
             this.registeredMetric = registeredMetric;
             return this;
@@ -231,6 +245,7 @@ public class MetricOption implements Serializable {
 
             List<Integer> inlongAuditKeysList = null;
             Set<String> ipPortSet = null;
+            Map<RowKind, Integer> inlongChangelogAuditKeysMap = null;
 
             if (inlongAudit != null) {
                 Preconditions.checkArgument(labels.containsKey(GROUP_ID) && 
labels.containsKey(STREAM_ID),
@@ -244,10 +259,13 @@ public class MetricOption implements Serializable {
 
                 inlongAuditKeysList = 
AuditUtils.extractAuditKeys(inlongAuditKeys);
                 ipPortSet = AuditUtils.extractAuditIpPorts(inlongAudit);
+                inlongChangelogAuditKeysMap = 
AuditUtils.extractChangelogAuditKeyMap(inlongChangelogAuditKeys);
+
             }
 
             return new MetricOption(labels, inlongAudit, registeredMetric, 
initRecords, initBytes,
-                    initDirtyRecords, initDirtyBytes, initReadPhase, 
inlongAuditKeysList, ipPortSet);
+                    initDirtyRecords, initDirtyBytes, initReadPhase, 
inlongAuditKeysList, inlongChangelogAuditKeysMap,
+                    ipPortSet);
         }
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
index 0dc854dfa2..97ae0977d8 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
@@ -104,9 +104,11 @@ public class MysqlTableFactory implements 
DynamicTableSourceFactory {
         String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
         String auditHostAndPorts = config.get(INLONG_AUDIT);
         String auditKeys = config.get(AUDIT_KEYS);
+        String changelogKeys = config.get(CHANGELOG_AUDIT_KEYS);
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
                 .withAuditAddress(auditHostAndPorts)
+                .withChangelogAuditKeys(changelogKeys)
                 .withAuditKeys(auditKeys)
                 .build();
 
@@ -177,6 +179,7 @@ public class MysqlTableFactory implements 
DynamicTableSourceFactory {
         options.add(INLONG_AUDIT);
         options.add(ROW_KINDS_FILTERED);
         options.add(AUDIT_KEYS);
+        options.add(CHANGELOG_AUDIT_KEYS);
         options.add(GH_OST_DDL_CHANGE);
         options.add(GH_OST_TABLE_REGEX);
         options.add(CHUNK_KEY_COLUMN);
@@ -548,4 +551,10 @@ public class MysqlTableFactory implements 
DynamicTableSourceFactory {
                     .defaultValue("")
                     .withDescription("Audit keys for metrics collecting");
 
+    public static final ConfigOption<String> CHANGELOG_AUDIT_KEYS =
+            ConfigOptions.key("metrics.changelog.audit.key")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("Audit keys for changelog metrics 
collecting");
+
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
index a29500c475..3c6096699a 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
@@ -137,21 +137,19 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
         Envelope.Operation op = Envelope.operationFor(record);
         Struct value = (Struct) record.value();
         Schema valueSchema = record.valueSchema();
+        if (cdcExactlyMetric != null) {
+            out = createMetricsCollector(record, out);
+        }
+
         if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
             GenericRowData insert = extractAfterRow(value, valueSchema);
             validator.validate(insert, RowKind.INSERT);
             insert.setRowKind(RowKind.INSERT);
-            if (cdcExactlyMetric != null) {
-                out = createMetricsCollector(record, out);
-            }
             emit(record, insert, out);
         } else if (op == Envelope.Operation.DELETE) {
             GenericRowData delete = extractBeforeRow(value, valueSchema);
             validator.validate(delete, RowKind.DELETE);
             delete.setRowKind(RowKind.DELETE);
-            if (cdcExactlyMetric != null) {
-                out = createMetricsCollector(record, out);
-            }
             emit(record, delete, out);
         } else {
             if (changelogMode == DebeziumChangelogMode.ALL) {
@@ -164,9 +162,6 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData after = extractAfterRow(value, valueSchema);
             validator.validate(after, RowKind.UPDATE_AFTER);
             after.setRowKind(RowKind.UPDATE_AFTER);
-            if (cdcExactlyMetric != null) {
-                out = createMetricsCollector(record, out);
-            }
             emit(record, after, out);
         }
     }

Reply via email to