This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a8a05510ed [INLONG-11829][Sort] Optimize MySQL-CDC changelog audit
report (#11835)
a8a05510ed is described below
commit a8a05510ed45e60b0a1ee9e9edb89af4df34b4ec
Author: vernedeng <[email protected]>
AuthorDate: Thu Apr 17 14:56:08 2025 +0800
[INLONG-11829][Sort] Optimize MySQL-CDC changelog audit report (#11835)
* [INLONG-11829][Sort] Optimize MySQL-CDC changelog audit report
---
.../org/apache/inlong/sort/util/AuditUtils.java | 11 +++++++++
.../inlong/sort/base/metric/CdcExactlyMetric.java | 26 ++++------------------
.../inlong/sort/base/metric/MetricOption.java | 20 ++++++++++++++++-
.../inlong/sort/mysql/MysqlTableFactory.java | 9 ++++++++
.../mysql/RowDataDebeziumDeserializeSchema.java | 13 ++++-------
5 files changed, 47 insertions(+), 32 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
index daf2c8a58b..7c647c37c5 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java
@@ -18,10 +18,13 @@
package org.apache.inlong.sort.util;
import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import org.apache.flink.types.RowKind;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -52,4 +55,12 @@ public class AuditUtils {
.collect(Collectors.toList());
}
+ public static Map<RowKind, Integer> extractChangelogAuditKeyMap(String
changelogAuditKeys) {
+ return
Splitter.on("&").withKeyValueSeparator("=").split(changelogAuditKeys)
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(entry ->
RowKind.valueOf(entry.getKey()),
+ entry -> Integer.parseInt(entry.getValue())));
+ }
+
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
index c4c94e152b..83d9e4f7dc 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
@@ -20,13 +20,10 @@ package org.apache.inlong.sort.base.metric;
import org.apache.inlong.audit.AuditReporterImpl;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
@@ -51,28 +48,13 @@ public class CdcExactlyMetric implements Serializable,
SourceMetricsReporter {
this.labels = option.getLabels();
this.groupId = labels.get(GROUP_ID);
this.streamId = labels.get(STREAM_ID);
- this.auditKeyMap = new HashMap<>();
if (option.getIpPorts().isPresent()) {
auditReporter = new AuditReporterImpl();
auditReporter.setAutoFlush(false);
auditReporter.setAuditProxy(option.getIpPortSet());
- List<Integer> auditKeys = option.getInlongAuditKeys();
-
- if (CollectionUtils.isEmpty(auditKeys)) {
- log.warn("inlong audit keys is empty");
- } else if (auditKeys.size() == 1) {
- auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
- log.warn("only the insert audit key is set, the update and
delete audit will be ignored");
- } else if (auditKeys.size() == 4) {
- auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
- auditKeyMap.put(RowKind.UPDATE_BEFORE, auditKeys.get(1));
- auditKeyMap.put(RowKind.UPDATE_AFTER, auditKeys.get(2));
- auditKeyMap.put(RowKind.DELETE, auditKeys.get(3));
- } else {
- throw new IllegalArgumentException("audit key size must be 1
or 4");
- }
}
+ auditKeyMap = option.getInlongChangelogAuditKeys();
log.info("CdcExactlyMetric init, groupId: {}, streamId: {}, audit key:
{}", groupId, streamId, auditKeyMap);
}
@@ -82,15 +64,15 @@ public class CdcExactlyMetric implements Serializable,
SourceMetricsReporter {
if (data instanceof RowData) {
RowData rowData = (RowData) data;
RowKind rowKind = rowData.getRowKind();
- int key = auditKeyMap.get(rowKind);
+ Integer key = auditKeyMap.get(rowKind);
outputMetrics(1, size, dataTime, key);
} else {
outputMetrics(1, size, dataTime, auditKeyMap.get(RowKind.INSERT));
}
}
- public void outputMetrics(long rowCountSize, long rowDataSize, long
dataTime, int key) {
- if (auditReporter != null) {
+ public void outputMetrics(long rowCountSize, long rowDataSize, long
dataTime, Integer key) {
+ if (auditReporter != null && key != null) {
auditReporter.add(
this.currentCheckpointId,
key,
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index eb01d9f165..bcba62b1c0 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.metric;
import org.apache.inlong.sort.util.AuditUtils;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
@@ -56,6 +57,7 @@ public class MetricOption implements Serializable {
private long initDirtyBytes;
private long readPhase;
private List<Integer> inlongAuditKeys;
+ private Map<RowKind, Integer> inlongChangelogAuditKeys;
private MetricOption(
Map<String, String> labels,
@@ -67,6 +69,7 @@ public class MetricOption implements Serializable {
Long initDirtyBytes,
Long readPhase,
List<Integer> inlongAuditKeys,
+ Map<RowKind, Integer> inlongChangelogAuditKeys,
Set<String> ipPortSet) {
this.initRecords = initRecords;
this.initBytes = initBytes;
@@ -78,6 +81,7 @@ public class MetricOption implements Serializable {
this.inlongAuditKeys = inlongAuditKeys;
this.ipPortSet = ipPortSet;
this.registeredMetric = registeredMetric;
+ this.inlongChangelogAuditKeys = inlongChangelogAuditKeys;
}
public Map<String, String> getLabels() {
@@ -124,6 +128,10 @@ public class MetricOption implements Serializable {
return inlongAuditKeys;
}
+ public Map<RowKind, Integer> getInlongChangelogAuditKeys() {
+ return inlongChangelogAuditKeys;
+ }
+
public long getInitDirtyBytes() {
return initDirtyBytes;
}
@@ -155,6 +163,7 @@ public class MetricOption implements Serializable {
private String inlongLabels;
private String inlongAudit;
private String inlongAuditKeys;
+ private String inlongChangelogAuditKeys;
private RegisteredMetric registeredMetric = RegisteredMetric.ALL;
private long initRecords = 0L;
private long initBytes = 0L;
@@ -180,6 +189,11 @@ public class MetricOption implements Serializable {
return this;
}
+ public MetricOption.Builder withChangelogAuditKeys(String
inlongChangelogAuditKeys) {
+ this.inlongChangelogAuditKeys = inlongChangelogAuditKeys;
+ return this;
+ }
+
public MetricOption.Builder withRegisterMetric(RegisteredMetric
registeredMetric) {
this.registeredMetric = registeredMetric;
return this;
@@ -231,6 +245,7 @@ public class MetricOption implements Serializable {
List<Integer> inlongAuditKeysList = null;
Set<String> ipPortSet = null;
+ Map<RowKind, Integer> inlongChangelogAuditKeysMap = null;
if (inlongAudit != null) {
Preconditions.checkArgument(labels.containsKey(GROUP_ID) &&
labels.containsKey(STREAM_ID),
@@ -244,10 +259,13 @@ public class MetricOption implements Serializable {
inlongAuditKeysList =
AuditUtils.extractAuditKeys(inlongAuditKeys);
ipPortSet = AuditUtils.extractAuditIpPorts(inlongAudit);
+ inlongChangelogAuditKeysMap =
AuditUtils.extractChangelogAuditKeyMap(inlongChangelogAuditKeys);
+
}
return new MetricOption(labels, inlongAudit, registeredMetric,
initRecords, initBytes,
- initDirtyRecords, initDirtyBytes, initReadPhase,
inlongAuditKeysList, ipPortSet);
+ initDirtyRecords, initDirtyBytes, initReadPhase,
inlongAuditKeysList, inlongChangelogAuditKeysMap,
+ ipPortSet);
}
}
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
index 0dc854dfa2..97ae0977d8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java
@@ -104,9 +104,11 @@ public class MysqlTableFactory implements
DynamicTableSourceFactory {
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
String auditHostAndPorts = config.get(INLONG_AUDIT);
String auditKeys = config.get(AUDIT_KEYS);
+ String changelogKeys = config.get(CHANGELOG_AUDIT_KEYS);
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
+ .withChangelogAuditKeys(changelogKeys)
.withAuditKeys(auditKeys)
.build();
@@ -177,6 +179,7 @@ public class MysqlTableFactory implements
DynamicTableSourceFactory {
options.add(INLONG_AUDIT);
options.add(ROW_KINDS_FILTERED);
options.add(AUDIT_KEYS);
+ options.add(CHANGELOG_AUDIT_KEYS);
options.add(GH_OST_DDL_CHANGE);
options.add(GH_OST_TABLE_REGEX);
options.add(CHUNK_KEY_COLUMN);
@@ -548,4 +551,10 @@ public class MysqlTableFactory implements
DynamicTableSourceFactory {
.defaultValue("")
.withDescription("Audit keys for metrics collecting");
+ public static final ConfigOption<String> CHANGELOG_AUDIT_KEYS =
+ ConfigOptions.key("metrics.changelog.audit.key")
+ .stringType()
+ .defaultValue("")
+ .withDescription("Audit keys for changelog metrics
collecting");
+
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
index a29500c475..3c6096699a 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
@@ -137,21 +137,19 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
+ if (cdcExactlyMetric != null) {
+ out = createMetricsCollector(record, out);
+ }
+
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
- if (cdcExactlyMetric != null) {
- out = createMetricsCollector(record, out);
- }
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
- if (cdcExactlyMetric != null) {
- out = createMetricsCollector(record, out);
- }
emit(record, delete, out);
} else {
if (changelogMode == DebeziumChangelogMode.ALL) {
@@ -164,9 +162,6 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
- if (cdcExactlyMetric != null) {
- out = createMetricsCollector(record, out);
- }
emit(record, after, out);
}
}