This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4c9da1bb6a [INLONG-11807][Sort] Support exactly metric report in
mysql-cdc case (#11808)
4c9da1bb6a is described below
commit 4c9da1bb6ab7a5a31fdc77bdb406a83c61abf911
Author: vernedeng <[email protected]>
AuthorDate: Mon Mar 24 11:51:45 2025 +0800
[INLONG-11807][Sort] Support exactly metric report in mysql-cdc case
(#11808)
---
.../inlong/sort/base/metric/CdcExactlyMetric.java | 120 +++++++++++++++++++++
.../mysql/RowDataDebeziumDeserializeSchema.java | 26 ++---
2 files changed, 133 insertions(+), 13 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
new file mode 100644
index 0000000000..c4c94e152b
--- /dev/null
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import org.apache.inlong.audit.AuditReporterImpl;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static
org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
+import static org.apache.inlong.sort.base.Constants.GROUP_ID;
+import static org.apache.inlong.sort.base.Constants.STREAM_ID;
+import static
org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
+
+@Slf4j
+public class CdcExactlyMetric implements Serializable, SourceMetricsReporter {
+
+ private final Map<String, String> labels;
+ private final Map<RowKind, Integer> auditKeyMap;
+ private final String groupId;
+ private final String streamId;
+
+ private AuditReporterImpl auditReporter;
+ private Long currentCheckpointId = 0L;
+ private Long lastCheckpointId = 0L;
+
+ public CdcExactlyMetric(MetricOption option) {
+ this.labels = option.getLabels();
+ this.groupId = labels.get(GROUP_ID);
+ this.streamId = labels.get(STREAM_ID);
+ this.auditKeyMap = new HashMap<>();
+
+ if (option.getIpPorts().isPresent()) {
+ auditReporter = new AuditReporterImpl();
+ auditReporter.setAutoFlush(false);
+ auditReporter.setAuditProxy(option.getIpPortSet());
+ List<Integer> auditKeys = option.getInlongAuditKeys();
+
+ if (CollectionUtils.isEmpty(auditKeys)) {
+ log.warn("inlong audit keys is empty");
+ } else if (auditKeys.size() == 1) {
+ auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
+ log.warn("only the insert audit key is set, the update and
delete audit will be ignored");
+ } else if (auditKeys.size() == 4) {
+ auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
+ auditKeyMap.put(RowKind.UPDATE_BEFORE, auditKeys.get(1));
+ auditKeyMap.put(RowKind.UPDATE_AFTER, auditKeys.get(2));
+ auditKeyMap.put(RowKind.DELETE, auditKeys.get(3));
+ } else {
+ throw new IllegalArgumentException("audit key size must be 1
or 4");
+ }
+ }
+ log.info("CdcExactlyMetric init, groupId: {}, streamId: {}, audit key:
{}", groupId, streamId, auditKeyMap);
+ }
+
+ @Override
+ public void outputMetricsWithEstimate(Object data, long dataTime) {
+ long size = getDataSize(data);
+ if (data instanceof RowData) {
+ RowData rowData = (RowData) data;
+ RowKind rowKind = rowData.getRowKind();
+ int key = auditKeyMap.get(rowKind);
+ outputMetrics(1, size, dataTime, key);
+ } else {
+ outputMetrics(1, size, dataTime, auditKeyMap.get(RowKind.INSERT));
+ }
+ }
+
+ public void outputMetrics(long rowCountSize, long rowDataSize, long
dataTime, int key) {
+ if (auditReporter != null) {
+ auditReporter.add(
+ this.currentCheckpointId,
+ key,
+ DEFAULT_AUDIT_TAG,
+ groupId,
+ streamId,
+ dataTime,
+ rowCountSize,
+ rowDataSize,
+ DEFAULT_AUDIT_VERSION);
+ }
+ }
+
+ public void updateLastCheckpointId(Long checkpointId) {
+ lastCheckpointId = checkpointId;
+ }
+
+ public void updateCurrentCheckpointId(Long checkpointId) {
+ currentCheckpointId = checkpointId;
+ }
+
+ public void flushAudit() {
+ if (auditReporter != null) {
+ auditReporter.flush(lastCheckpointId);
+ }
+ }
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
index 0a1f2013f2..a29500c475 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java
@@ -17,9 +17,9 @@
package org.apache.inlong.sort.mysql;
+import org.apache.inlong.sort.base.metric.CdcExactlyMetric;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.table.AppendMetadataCollector;
@@ -102,7 +102,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
/** Changelog Mode to use for encoding changes in Flink internal data
structure. */
private final DebeziumChangelogMode changelogMode;
- private SourceExactlyMetric sourceExactlyMetric;
+ private CdcExactlyMetric cdcExactlyMetric;
private final MetricOption metricOption;
/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
@@ -141,7 +141,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
- if (sourceExactlyMetric != null) {
+ if (cdcExactlyMetric != null) {
out = createMetricsCollector(record, out);
}
emit(record, insert, out);
@@ -149,7 +149,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
- if (sourceExactlyMetric != null) {
+ if (cdcExactlyMetric != null) {
out = createMetricsCollector(record, out);
}
emit(record, delete, out);
@@ -164,7 +164,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
- if (sourceExactlyMetric != null) {
+ if (cdcExactlyMetric != null) {
out = createMetricsCollector(record, out);
}
emit(record, after, out);
@@ -178,7 +178,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
* @return metrics collector
*/
private Collector<RowData> createMetricsCollector(SourceRecord record,
Collector<RowData> out) {
- MetricsCollector<RowData> collector = new MetricsCollector<>(out,
sourceExactlyMetric);
+ MetricsCollector<RowData> collector = new MetricsCollector<>(out,
cdcExactlyMetric);
collector.resetTimestamp((Long) ((Struct)
record.value()).get(FieldName.TIMESTAMP));
return collector;
}
@@ -190,7 +190,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
*/
public void initSourceMetricData() {
if (metricOption != null) {
- this.sourceExactlyMetric = new SourceExactlyMetric(metricOption);
+ this.cdcExactlyMetric = new CdcExactlyMetric(metricOption);
}
}
@@ -222,20 +222,20 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
}
public void flushAudit() {
- if (sourceExactlyMetric != null) {
- sourceExactlyMetric.flushAudit();
+ if (cdcExactlyMetric != null) {
+ cdcExactlyMetric.flushAudit();
}
}
public void updateCurrentCheckpointId(long checkpointId) {
- if (sourceExactlyMetric != null) {
- sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ if (cdcExactlyMetric != null) {
+ cdcExactlyMetric.updateCurrentCheckpointId(checkpointId);
}
}
public void updateLastCheckpointId(long checkpointId) {
- if (sourceExactlyMetric != null) {
- sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+ if (cdcExactlyMetric != null) {
+ cdcExactlyMetric.updateLastCheckpointId(checkpointId);
}
}
//
-------------------------------------------------------------------------------------