This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c9da1bb6a [INLONG-11807][Sort] Support exactly metric report in 
mysql-cdc case (#11808)
4c9da1bb6a is described below

commit 4c9da1bb6ab7a5a31fdc77bdb406a83c61abf911
Author: vernedeng <[email protected]>
AuthorDate: Mon Mar 24 11:51:45 2025 +0800

    [INLONG-11807][Sort] Support exactly metric report in mysql-cdc case 
(#11808)
---
 .../inlong/sort/base/metric/CdcExactlyMetric.java  | 120 +++++++++++++++++++++
 .../mysql/RowDataDebeziumDeserializeSchema.java    |  26 ++---
 2 files changed, 133 insertions(+), 13 deletions(-)

diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
new file mode 100644
index 0000000000..c4c94e152b
--- /dev/null
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.inlong.audit.AuditReporterImpl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static 
org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
+import static org.apache.inlong.sort.base.Constants.GROUP_ID;
+import static org.apache.inlong.sort.base.Constants.STREAM_ID;
+import static 
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
+
+@Slf4j
+public class CdcExactlyMetric implements Serializable, SourceMetricsReporter {
+
+    private final Map<String, String> labels;
+    private final Map<RowKind, Integer> auditKeyMap;
+    private final String groupId;
+    private final String streamId;
+
+    private AuditReporterImpl auditReporter;
+    private Long currentCheckpointId = 0L;
+    private Long lastCheckpointId = 0L;
+
+    public CdcExactlyMetric(MetricOption option) {
+        this.labels = option.getLabels();
+        this.groupId = labels.get(GROUP_ID);
+        this.streamId = labels.get(STREAM_ID);
+        this.auditKeyMap = new HashMap<>();
+
+        if (option.getIpPorts().isPresent()) {
+            auditReporter = new AuditReporterImpl();
+            auditReporter.setAutoFlush(false);
+            auditReporter.setAuditProxy(option.getIpPortSet());
+            List<Integer> auditKeys = option.getInlongAuditKeys();
+
+            if (CollectionUtils.isEmpty(auditKeys)) {
+                log.warn("inlong audit keys is empty");
+            } else if (auditKeys.size() == 1) {
+                auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
+                log.warn("only the insert audit key is set, the update and 
delete audit will be ignored");
+            } else if (auditKeys.size() == 4) {
+                auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
+                auditKeyMap.put(RowKind.UPDATE_BEFORE, auditKeys.get(1));
+                auditKeyMap.put(RowKind.UPDATE_AFTER, auditKeys.get(2));
+                auditKeyMap.put(RowKind.DELETE, auditKeys.get(3));
+            } else {
+                throw new IllegalArgumentException("audit key size must be 1 
or 4");
+            }
+        }
+        log.info("CdcExactlyMetric init, groupId: {}, streamId: {}, audit key: 
{}", groupId, streamId, auditKeyMap);
+    }
+
+    @Override
+    public void outputMetricsWithEstimate(Object data, long dataTime) {
+        long size = getDataSize(data);
+        if (data instanceof RowData) {
+            RowData rowData = (RowData) data;
+            RowKind rowKind = rowData.getRowKind();
+            int key = auditKeyMap.get(rowKind);
+            outputMetrics(1, size, dataTime, key);
+        } else {
+            outputMetrics(1, size, dataTime, auditKeyMap.get(RowKind.INSERT));
+        }
+    }
+
+    public void outputMetrics(long rowCountSize, long rowDataSize, long 
dataTime, int key) {
+        if (auditReporter != null) {
+            auditReporter.add(
+                    this.currentCheckpointId,
+                    key,
+                    DEFAULT_AUDIT_TAG,
+                    groupId,
+                    streamId,
+                    dataTime,
+                    rowCountSize,
+                    rowDataSize,
+                    DEFAULT_AUDIT_VERSION);
+        }
+    }
+
+    public void updateLastCheckpointId(Long checkpointId) {
+        lastCheckpointId = checkpointId;
+    }
+
+    public void updateCurrentCheckpointId(Long checkpointId) {
+        currentCheckpointId = checkpointId;
+    }
+
+    public void flushAudit() {
+        if (auditReporter != null) {
+            auditReporter.flush(lastCheckpointId);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
index 0a1f2013f2..a29500c475 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
@@ -17,9 +17,9 @@
 
 package org.apache.inlong.sort.mysql;
 
+import org.apache.inlong.sort.base.metric.CdcExactlyMetric;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 
 import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
 import com.ververica.cdc.debezium.table.AppendMetadataCollector;
@@ -102,7 +102,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
 
     /** Changelog Mode to use for encoding changes in Flink internal data 
structure. */
     private final DebeziumChangelogMode changelogMode;
-    private SourceExactlyMetric sourceExactlyMetric;
+    private CdcExactlyMetric cdcExactlyMetric;
     private final MetricOption metricOption;
 
     /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
@@ -141,7 +141,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData insert = extractAfterRow(value, valueSchema);
             validator.validate(insert, RowKind.INSERT);
             insert.setRowKind(RowKind.INSERT);
-            if (sourceExactlyMetric != null) {
+            if (cdcExactlyMetric != null) {
                 out = createMetricsCollector(record, out);
             }
             emit(record, insert, out);
@@ -149,7 +149,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData delete = extractBeforeRow(value, valueSchema);
             validator.validate(delete, RowKind.DELETE);
             delete.setRowKind(RowKind.DELETE);
-            if (sourceExactlyMetric != null) {
+            if (cdcExactlyMetric != null) {
                 out = createMetricsCollector(record, out);
             }
             emit(record, delete, out);
@@ -164,7 +164,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData after = extractAfterRow(value, valueSchema);
             validator.validate(after, RowKind.UPDATE_AFTER);
             after.setRowKind(RowKind.UPDATE_AFTER);
-            if (sourceExactlyMetric != null) {
+            if (cdcExactlyMetric != null) {
                 out = createMetricsCollector(record, out);
             }
             emit(record, after, out);
@@ -178,7 +178,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
      * @return metrics collector
      */
     private Collector<RowData> createMetricsCollector(SourceRecord record, 
Collector<RowData> out) {
-        MetricsCollector<RowData> collector = new MetricsCollector<>(out, 
sourceExactlyMetric);
+        MetricsCollector<RowData> collector = new MetricsCollector<>(out, 
cdcExactlyMetric);
         collector.resetTimestamp((Long) ((Struct) 
record.value()).get(FieldName.TIMESTAMP));
         return collector;
     }
@@ -190,7 +190,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
      */
     public void initSourceMetricData() {
         if (metricOption != null) {
-            this.sourceExactlyMetric = new SourceExactlyMetric(metricOption);
+            this.cdcExactlyMetric = new CdcExactlyMetric(metricOption);
         }
     }
 
@@ -222,20 +222,20 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
     }
 
     public void flushAudit() {
-        if (sourceExactlyMetric != null) {
-            sourceExactlyMetric.flushAudit();
+        if (cdcExactlyMetric != null) {
+            cdcExactlyMetric.flushAudit();
         }
     }
 
     public void updateCurrentCheckpointId(long checkpointId) {
-        if (sourceExactlyMetric != null) {
-            sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        if (cdcExactlyMetric != null) {
+            cdcExactlyMetric.updateCurrentCheckpointId(checkpointId);
         }
     }
 
     public void updateLastCheckpointId(long checkpointId) {
-        if (sourceExactlyMetric != null) {
-            sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+        if (cdcExactlyMetric != null) {
+            cdcExactlyMetric.updateLastCheckpointId(checkpointId);
         }
     }
     // 
-------------------------------------------------------------------------------------

Reply via email to