This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 615f59ea4 [INLONG-5459][Sort] Add audit for MySQL extract node (#5539)
615f59ea4 is described below
commit 615f59ea49c7931cad30020fe6170d7543e3d11b
Author: Xin Gong <[email protected]>
AuthorDate: Mon Aug 15 13:11:03 2022 +0800
[INLONG-5459][Sort] Add audit for MySQL extract node (#5539)
---
.../org/apache/inlong/sort/base/Constants.java | 1 -
.../inlong/sort/base/metric/SourceMetricData.java | 31 ++++++++
.../sort/base/util/ValidateMetricOptionUtils.java | 39 ++++++++++
inlong-sort/sort-connectors/mysql-cdc/pom.xml | 11 +++
.../sort/cdc/debezium/DebeziumSourceFunction.java | 25 ++++--
.../apache/inlong/sort/cdc/mysql/MySqlSource.java | 8 +-
.../inlong/sort/cdc/mysql/source/MySqlSource.java | 27 ++++---
.../sort/cdc/mysql/source/MySqlSourceBuilder.java | 5 ++
.../cdc/mysql/source/config/MySqlSourceConfig.java | 9 ++-
.../source/config/MySqlSourceConfigFactory.java | 13 ++--
.../mysql/source/config/MySqlSourceOptions.java | 6 --
.../source/metrics/MySqlSourceReaderMetrics.java | 91 ++++++++++++++++++----
.../mysql/source/reader/MySqlRecordEmitter.java | 9 +--
.../mysql/table/MySqlTableInlongSourceFactory.java | 12 ++-
.../sort/cdc/mysql/table/MySqlTableSource.java | 22 ++++--
15 files changed, 251 insertions(+), 58 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index b529bf2f0..19b1a90f4 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -75,7 +75,6 @@ public final class Constants {
// sort send successfully
public static final Integer AUDIT_SORT_OUTPUT = 8;
-
public static final ConfigOption<String> INLONG_METRIC =
ConfigOptions.key("inlong.metric")
.stringType()
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index e548784dc..bd82cc041 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -22,6 +22,8 @@ import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
@@ -41,12 +43,19 @@ public class SourceMetricData implements MetricData {
private Counter numBytesIn;
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
+ private final AuditImp auditImp;
public SourceMetricData(String groupId, String streamId, String nodeId,
MetricGroup metricGroup) {
+ this(groupId, streamId, nodeId, metricGroup, null);
+ }
+
+ public SourceMetricData(String groupId, String streamId, String nodeId,
MetricGroup metricGroup,
+ AuditImp auditImp) {
this.groupId = groupId;
this.streamId = streamId;
this.nodeId = nodeId;
this.metricGroup = metricGroup;
+ this.auditImp = auditImp;
}
/**
@@ -128,4 +137,26 @@ public class SourceMetricData implements MetricData {
public String getNodeId() {
return nodeId;
}
+
+ public void outputMetrics(long rowCountSize, long rowDataSize) {
+ outputMetricForFlink(rowCountSize, rowDataSize);
+ outputMetricForAudit(rowCountSize, rowDataSize);
+ }
+
+ public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
+ if (auditImp != null) {
+ auditImp.add(
+ Constants.AUDIT_SORT_INPUT,
+ getGroupId(),
+ getStreamId(),
+ System.currentTimeMillis(),
+ rowCountSize,
+ rowDataSize);
+ }
+ }
+
+ public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
+ this.numBytesIn.inc(rowDataSize);
+ this.numRecordsIn.inc(rowCountSize);
+ }
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
new file mode 100644
index 000000000..bd58e31ae
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/ValidateMetricOptionUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.apache.flink.table.api.ValidationException;
+
+/**
+ * validate option tool
+ */
+public class ValidateMetricOptionUtils {
+
+ /**
+ * validate inlong metric when set inlong audit
+ * @param inlongMetric inlong.metric option value
+ * @param inlongAudit inlong.audit option value
+ */
+ public static void validateInlongMetricIfSetInlongAudit(String
inlongMetric, String inlongAudit) {
+ if (inlongAudit != null && inlongMetric == null) {
+ throw new ValidationException("inlong metric is necessary when set
inlong audit");
+ }
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/mysql-cdc/pom.xml
b/inlong-sort/sort-connectors/mysql-cdc/pom.xml
index c29f2dcb9..a4e01dae3 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mysql-cdc/pom.xml
@@ -47,6 +47,11 @@
<artifactId>sort-connector-base</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>audit-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-json</artifactId>
@@ -112,6 +117,12 @@
</filter>
</filters>
<relocations>
+ <relocation>
+
<pattern>org.apache.inlong.sort.base</pattern>
+ <shadedPattern>
+
org.apache.inlong.sort.cdc.mysql.shaded.org.apache.inlong.sort.base
+ </shadedPattern>
+ </relocation>
<relocation>
<pattern>org.apache.kafka</pattern>
<shadedPattern>
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index 882775d47..d5b3c676b 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -47,6 +47,7 @@ import
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
@@ -66,7 +67,9 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -75,6 +78,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
@@ -151,7 +155,8 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
/**
* The specific binlog offset to read from when the first startup.
*/
- private final @Nullable DebeziumOffset specificOffset;
+ private final @Nullable
+ DebeziumOffset specificOffset;
/**
* Data for pending but uncommitted offsets.
@@ -221,6 +226,8 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
private String inlongMetric;
+ private String inlongAudit;
+
private SourceMetricData sourceMetricData;
//
---------------------------------------------------------------------------------------
@@ -230,12 +237,14 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator,
- String inlongMetric) {
+ String inlongMetric,
+ String inlongAudit) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.inlongMetric = inlongMetric;
+ this.inlongAudit = inlongAudit;
}
@Override
@@ -413,7 +422,12 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- sourceMetricData = new SourceMetricData(groupId, streamId, nodeId,
metricGroup);
+ AuditImp auditImp = null;
+ if (inlongAudit != null) {
+ AuditImp.getInstance().setAuditProxy(new
HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+ auditImp = AuditImp.getInstance();
+ }
+ sourceMetricData = new SourceMetricData(groupId, streamId, nodeId,
metricGroup, auditImp);
sourceMetricData.registerMetricsForNumRecordsIn();
sourceMetricData.registerMetricsForNumBytesIn();
sourceMetricData.registerMetricsForNumBytesInPerSecond();
@@ -458,9 +472,8 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Override
public void deserialize(SourceRecord record,
Collector<T> out) throws Exception {
if (sourceMetricData != null) {
- sourceMetricData.getNumRecordsIn().inc(1L);
- sourceMetricData.getNumBytesIn()
-
.inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ sourceMetricData.outputMetrics(1L,
+
record.value().toString().getBytes(StandardCharsets.UTF_8).length);
}
deserializer.deserialize(record, out);
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
index 4caf6be21..0c1a18c13 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
@@ -71,6 +71,7 @@ public class MySqlSource {
private StartupOptions startupOptions = StartupOptions.initial();
private DebeziumDeserializationSchema<T> deserializer;
private String inlongMetric;
+ private String inlongAudit;
public Builder<T> hostname(String hostname) {
this.hostname = hostname;
@@ -173,6 +174,11 @@ public class MySqlSource {
return this;
}
+ public Builder<T> inlongAudit(String inlongAudit) {
+ this.inlongAudit = inlongAudit;
+ return this;
+ }
+
/**
* builder
*/
@@ -267,7 +273,7 @@ public class MySqlSource {
}
return new DebeziumSourceFunction<>(
- deserializer, props, specificOffset, new
MySqlValidator(props), inlongMetric);
+ deserializer, props, specificOffset, new
MySqlValidator(props), inlongMetric, inlongAudit);
}
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
index 3724d2a8e..df48edec2 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSource.java
@@ -36,6 +36,8 @@ import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompl
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.mysql.MySqlValidator;
import org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils;
@@ -60,9 +62,12 @@ import org.apache.inlong.sort.cdc.mysql.table.StartupMode;
import org.apache.kafka.connect.source.SourceRecord;
import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.function.Supplier;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.discoverCapturedTables;
import static
org.apache.inlong.sort.cdc.mysql.debezium.DebeziumUtils.openJdbcConnection;
@@ -144,16 +149,20 @@ public class MySqlSource<T>
MySqlSourceConfig sourceConfig =
configFactory.createConfig(readerContext.getIndexOfSubtask());
String inlongMetric = sourceConfig.getInlongMetric();
+ String inlongAudit = sourceConfig.getInlongAudit();
if (StringUtils.isNotEmpty(inlongMetric)) {
- String[] inlongMetricArray = inlongMetric.split("&");
- String groupId = inlongMetricArray[0];
- String streamId = inlongMetricArray[1];
- String nodeId = inlongMetricArray[2];
- sourceReaderMetrics.registerMetricsForNumBytesIn(groupId,
streamId, nodeId, "numBytesIn");
- sourceReaderMetrics.registerMetricsForNumRecordsIn(groupId,
streamId, nodeId, "numRecordsIn");
- sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(groupId,
streamId, nodeId, "numBytesInPerSecond");
-
sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(groupId, streamId,
nodeId,
- "numRecordsInPerSecond");
+ String[] inlongMetricArray = inlongMetric.split(DELIMITER);
+ sourceReaderMetrics.setInlongGroupId(inlongMetricArray[0]);
+ sourceReaderMetrics.setInlongGroupId(inlongMetricArray[1]);
+ sourceReaderMetrics.setInlongGroupId(inlongMetricArray[2]);
+ if (inlongAudit != null) {
+ AuditImp.getInstance().setAuditProxy(new
HashSet<>(Arrays.asList(inlongAudit.split(DELIMITER))));
+ sourceReaderMetrics.setAuditImp(AuditImp.getInstance());
+ }
+
sourceReaderMetrics.registerMetricsForNumBytesIn(Constants.NUM_BYTES_IN);
+
sourceReaderMetrics.registerMetricsForNumRecordsIn(Constants.NUM_RECORDS_IN);
+
sourceReaderMetrics.registerMetricsForNumBytesInPerSecond(Constants.NUM_BYTES_IN_PER_SECOND);
+
sourceReaderMetrics.registerMetricsForNumRecordsInPerSecond(Constants.NUM_RECORDS_IN_PER_SECOND);
}
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecord>>
elementsQueue =
new FutureCompletingBlockingQueue<>();
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
index 015dc111e..41f1c31e9 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/MySqlSourceBuilder.java
@@ -257,6 +257,11 @@ public class MySqlSourceBuilder<T> {
return this;
}
+ public MySqlSourceBuilder<T> inlongAudit(String inlongAudit) {
+ this.configFactory.inlongAudit(inlongAudit);
+ return this;
+ }
+
/**
* Build the {@link MySqlSource}.
*
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
index 2104e6f0b..d21d04836 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfig.java
@@ -68,6 +68,7 @@ public class MySqlSourceConfig implements Serializable {
private final MySqlConnectorConfig dbzMySqlConfig;
private final String inlongMetric;
+ private final String inlongAudit;
MySqlSourceConfig(
String hostname,
@@ -91,7 +92,8 @@ public class MySqlSourceConfig implements Serializable {
boolean scanNewlyAddedTableEnabled,
Properties dbzProperties,
Properties jdbcProperties,
- String inlongMetric) {
+ String inlongMetric,
+ String inlongAudit) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@@ -116,6 +118,7 @@ public class MySqlSourceConfig implements Serializable {
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
this.jdbcProperties = jdbcProperties;
this.inlongMetric = inlongMetric;
+ this.inlongAudit = inlongAudit;
}
public String getHostname() {
@@ -218,4 +221,8 @@ public class MySqlSourceConfig implements Serializable {
public String getInlongMetric() {
return inlongMetric;
}
+
+ public String getInlongAudit() {
+ return inlongAudit;
+ }
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
index 2bf6507ac..9eff7b8ee 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceConfigFactory.java
@@ -75,12 +75,18 @@ public class MySqlSourceConfigFactory implements
Serializable {
private Properties dbzProperties;
private String inlongMetric;
+ private String inlongAudit;
public MySqlSourceConfigFactory inlongMetric(String inlongMetric) {
this.inlongMetric = inlongMetric;
return this;
}
+ public MySqlSourceConfigFactory inlongAudit(String inlongAudit) {
+ this.inlongAudit = inlongAudit;
+ return this;
+ }
+
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
return this;
@@ -341,10 +347,6 @@ public class MySqlSourceConfigFactory implements
Serializable {
jdbcProperties = new Properties();
}
- if (inlongMetric == null) {
- inlongMetric = "";
- }
-
return new MySqlSourceConfig(
hostname,
port,
@@ -367,6 +369,7 @@ public class MySqlSourceConfigFactory implements
Serializable {
scanNewlyAddedTableEnabled,
props,
jdbcProperties,
- inlongMetric);
+ inlongMetric,
+ inlongAudit);
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
index ca32dd4a9..8a211cf19 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/config/MySqlSourceOptions.java
@@ -30,12 +30,6 @@ import java.time.Duration;
*/
public class MySqlSourceOptions {
- public static final ConfigOption<String> INLONG_METRIC =
- ConfigOptions.key("inlong.metric")
- .stringType()
- .defaultValue("")
- .withDescription("INLONG GROUP ID + '&' + STREAM ID + '&'
+ NODE ID");
-
public static final ConfigOption<String> HOSTNAME =
ConfigOptions.key("hostname")
.stringType()
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index fc375b1d0..b301ed572 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -23,6 +23,8 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.cdc.mysql.source.reader.MySqlSourceReader;
/**
@@ -59,6 +61,10 @@ public class MySqlSourceReaderMetrics {
private static String STREAM_ID = "streamId";
private static String GROUP_ID = "groupId";
private static String NODE_ID = "nodeId";
+ private String inlongGroupId;
+ private String inlongSteamId;
+ private String nodeId;
+ private AuditImp auditImp;
public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
@@ -70,29 +76,30 @@ public class MySqlSourceReaderMetrics {
metricGroup.gauge("sourceIdleTime", (Gauge<Long>) this::getIdleTime);
}
- public void registerMetricsForNumRecordsIn(String groupId, String
streamId, String nodeId, String metricName) {
+ public void registerMetricsForNumRecordsIn(String metricName) {
numRecordsIn =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID,
streamId).addGroup(NODE_ID, nodeId)
+ metricGroup.addGroup(GROUP_ID,
this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
+ .addGroup(NODE_ID, this.nodeId)
.counter(metricName);
}
- public void registerMetricsForNumBytesIn(String groupId, String streamId,
String nodeId, String metricName) {
+ public void registerMetricsForNumBytesIn(String metricName) {
numBytesIn =
- metricGroup.addGroup(GROUP_ID, groupId).addGroup(STREAM_ID,
streamId).addGroup(NODE_ID, nodeId)
+ metricGroup.addGroup(GROUP_ID,
this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
+ .addGroup(NODE_ID, this.nodeId)
.counter(metricName);
}
- public void registerMetricsForNumRecordsInPerSecond(String groupId, String
streamId, String nodeId,
- String metricName) {
- numRecordsInPerSecond = metricGroup.addGroup(GROUP_ID,
groupId).addGroup(STREAM_ID, streamId).addGroup(NODE_ID,
- nodeId)
- .meter(metricName, new MeterView(this.numRecordsIn,
TIME_SPAN_IN_SECONDS));
+ public void registerMetricsForNumRecordsInPerSecond(String metricName) {
+ numRecordsInPerSecond =
+ metricGroup.addGroup(GROUP_ID,
this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
+ .addGroup(NODE_ID, nodeId)
+ .meter(metricName, new MeterView(this.numRecordsIn,
TIME_SPAN_IN_SECONDS));
}
- public void registerMetricsForNumBytesInPerSecond(String groupId, String
streamId, String nodeId,
- String metricName) {
- numBytesInPerSecond = metricGroup.addGroup(GROUP_ID,
groupId).addGroup(STREAM_ID, streamId)
- .addGroup(NODE_ID, nodeId)
+ public void registerMetricsForNumBytesInPerSecond(String metricName) {
+ numBytesInPerSecond = metricGroup.addGroup(GROUP_ID,
this.inlongGroupId).addGroup(STREAM_ID, this.inlongSteamId)
+ .addGroup(NODE_ID, this.nodeId)
.meter(metricName, new MeterView(this.numBytesIn,
TIME_SPAN_IN_SECONDS));
}
@@ -139,4 +146,62 @@ public class MySqlSourceReaderMetrics {
public Meter getNumBytesInPerSecond() {
return numBytesInPerSecond;
}
+
+ public String getInlongGroupId() {
+ return inlongGroupId;
+ }
+
+ public String getInlongSteamId() {
+ return inlongSteamId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setInlongGroupId(String inlongGroupId) {
+ this.inlongGroupId = inlongGroupId;
+ }
+
+ public void setInlongSteamId(String inlongSteamId) {
+ this.inlongSteamId = inlongSteamId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public AuditImp getAuditImp() {
+ return auditImp;
+ }
+
+ public void setAuditImp(AuditImp auditImp) {
+ this.auditImp = auditImp;
+ }
+
+ public void outputMetrics(long rowCountSize, long rowDataSize) {
+ outputMetricForFlink(rowCountSize, rowDataSize);
+ outputMetricForAudit(rowCountSize, rowDataSize);
+ }
+
+ public void outputMetricForAudit(long rowCountSize, long rowDataSize) {
+ if (this.auditImp != null) {
+ this.auditImp.add(
+ Constants.AUDIT_SORT_INPUT,
+ getInlongGroupId(),
+ getInlongSteamId(),
+ System.currentTimeMillis(),
+ rowCountSize,
+ rowDataSize);
+ }
+ }
+
+ public void outputMetricForFlink(long rowCountSize, long rowDataSize) {
+ if (this.numBytesIn != null) {
+ numBytesIn.inc(rowDataSize);
+ }
+ if (this.numRecordsIn != null) {
+ this.numRecordsIn.inc(rowCountSize);
+ }
+ }
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index 3cfceb567..d2cc328f9 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -145,13 +145,8 @@ public final class MySqlRecordEmitter<T>
new Collector<T>() {
@Override
public void collect(final T t) {
- if (sourceReaderMetrics.getNumRecordsIn() != null)
{
- sourceReaderMetrics.getNumRecordsIn().inc(1L);
- }
- if (sourceReaderMetrics.getNumBytesIn() != null) {
- sourceReaderMetrics.getNumBytesIn()
-
.inc(t.toString().getBytes(StandardCharsets.UTF_8).length);
- }
+ sourceReaderMetrics.outputMetrics(1L,
+
t.toString().getBytes(StandardCharsets.UTF_8).length);
output.collect(t);
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
index 3ffd679f7..c3c475e92 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableInlongSourceFactory.java
@@ -26,6 +26,7 @@ import
org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.util.ValidateMetricOptionUtils;
import org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions;
import org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions;
import org.apache.inlong.sort.cdc.mysql.source.config.ServerIdRange;
@@ -37,6 +38,8 @@ import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.APPEND_MODE;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
@@ -46,7 +49,6 @@ import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HEARTBEAT_INTERVAL;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.HOSTNAME;
-import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.INLONG_METRIC;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.MIGRATE_ALL;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PASSWORD;
import static
org.apache.inlong.sort.cdc.mysql.source.config.MySqlSourceOptions.PORT;
@@ -121,6 +123,8 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
final ReadableConfig config = helper.getOptions();
final String inlongMetric = config.get(INLONG_METRIC);
+ final String inlongAudit = config.get(INLONG_AUDIT);
+
ValidateMetricOptionUtils.validateInlongMetricIfSetInlongAudit(inlongMetric,
inlongAudit);
final String hostname = config.get(HOSTNAME);
final String username = config.get(USERNAME);
final String password = config.get(PASSWORD);
@@ -186,7 +190,8 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()),
heartbeatInterval,
migrateAll,
- inlongMetric);
+ inlongMetric,
+ inlongAudit);
}
@Override
@@ -229,6 +234,7 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(HEARTBEAT_INTERVAL);
options.add(INLONG_METRIC);
+ options.add(INLONG_AUDIT);
return options;
}
@@ -284,7 +290,7 @@ public class MySqlTableInlongSourceFactory implements
DynamicTableSourceFactory
* Checks the given regular expression's syntax is valid.
*
* @param optionName the option name of the regex
- * @param regex The regular expression to be checked
+ * @param regex The regular expression to be checked
* @throws ValidationException If the expression's syntax is invalid
*/
private void validateRegex(String optionName, String regex) {
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index 7038a0e3a..621f122f6 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -82,6 +82,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
private final Duration heartbeatInterval;
private final boolean migrateAll;
private final String inlongMetric;
+ private final String inlongAudit;
//
--------------------------------------------------------------------------------------------
// Mutable attributes
//
--------------------------------------------------------------------------------------------
@@ -123,7 +124,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
StartupOptions startupOptions,
Duration heartbeatInterval,
boolean migrateAll,
- String inlongMetric) {
+ String inlongMetric,
+ String inlongAudit) {
this(
physicalSchema,
port,
@@ -150,7 +152,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
new Properties(),
heartbeatInterval,
migrateAll,
- inlongMetric);
+ inlongMetric,
+ inlongAudit);
}
/**
@@ -182,7 +185,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
Properties jdbcProperties,
Duration heartbeatInterval,
boolean migrateAll,
- String inlongMetric) {
+ String inlongMetric,
+ String inlongAudit) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -212,6 +216,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
this.heartbeatInterval = heartbeatInterval;
this.migrateAll = migrateAll;
this.inlongMetric = inlongMetric;
+ this.inlongAudit = inlongAudit;
}
@Override
@@ -271,6 +276,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
.inlongMetric(inlongMetric)
+ .inlongAudit(inlongAudit)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -286,6 +292,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
.inlongMetric(inlongMetric)
+ .inlongAudit(inlongAudit)
.deserializer(deserializer);
Optional.ofNullable(serverId)
.ifPresent(serverId ->
builder.serverId(Integer.parseInt(serverId)));
@@ -358,7 +365,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
jdbcProperties,
heartbeatInterval,
migrateAll,
- inlongMetric);
+ inlongMetric,
+ inlongAudit);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -397,7 +405,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(jdbcProperties, that.jdbcProperties)
- && Objects.equals(inlongMetric, that.inlongMetric);
+ && Objects.equals(inlongMetric, that.inlongMetric)
+ && Objects.equals(inlongAudit, that.inlongAudit);
}
@Override
@@ -427,7 +436,8 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
metadataKeys,
scanNewlyAddedTableEnabled,
jdbcProperties,
- inlongMetric);
+ inlongMetric,
+ inlongAudit);
}
@Override