This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new a41888bab [INLONG-5478][Manager] Add MySQL connector inlong metric
feature for no primary key table (#5487)
a41888bab is described below
commit a41888bab7e1caad2dc2860e8651458c2c6cf544
Author: Xin Gong <[email protected]>
AuthorDate: Thu Aug 11 17:23:05 2022 +0800
[INLONG-5478][Manager] Add MySQL connector inlong metric feature for no
primary key table (#5487)
---
inlong-sort/sort-connectors/mysql-cdc/pom.xml | 6 ++
.../sort/cdc/debezium/DebeziumSourceFunction.java | 75 ++++++++++++++++------
.../apache/inlong/sort/cdc/mysql/MySqlSource.java | 8 ++-
.../sort/cdc/mysql/table/MySqlTableSource.java | 1 +
4 files changed, 70 insertions(+), 20 deletions(-)
diff --git a/inlong-sort/sort-connectors/mysql-cdc/pom.xml
b/inlong-sort/sort-connectors/mysql-cdc/pom.xml
index fd4edb09e..c29f2dcb9 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mysql-cdc/pom.xml
@@ -42,6 +42,11 @@
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-format-json</artifactId>
@@ -65,6 +70,7 @@
<configuration>
<artifactSet>
<includes>
+ <include>org.apache.inlong:*</include>
<include>io.debezium:debezium-api</include>
<include>io.debezium:debezium-embedded</include>
<include>io.debezium:debezium-core</include>
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index a7991252a..882775d47 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -25,6 +25,7 @@ import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.heartbeat.Heartbeat;
import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.CheckpointListener;
@@ -43,8 +44,11 @@ import
org.apache.flink.runtime.state.FunctionSnapshotContext;
import
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
import org.apache.inlong.sort.cdc.debezium.internal.DebeziumOffset;
@@ -54,6 +58,7 @@ import
org.apache.inlong.sort.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
import org.apache.inlong.sort.cdc.debezium.internal.FlinkOffsetBackingStore;
import org.apache.inlong.sort.cdc.debezium.internal.Handover;
import org.apache.inlong.sort.cdc.debezium.internal.SchemaRecord;
+import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -214,17 +219,23 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
*/
private transient Handover handover;
+ private String inlongMetric;
+
+ private SourceMetricData sourceMetricData;
+
//
---------------------------------------------------------------------------------------
public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
- Validator validator) {
+ Validator validator,
+ String inlongMetric) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
+ this.inlongMetric = inlongMetric;
}
@Override
@@ -381,6 +392,34 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
+ // initialize metrics
+ // make RuntimeContext#getMetricGroup compatible between Flink 1.13
and Flink 1.14
+ final Method getMetricGroupMethod =
+ getRuntimeContext().getClass().getMethod("getMetricGroup");
+ getMetricGroupMethod.setAccessible(true);
+ final MetricGroup metricGroup =
+ (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+ metricGroup.gauge(
+ "currentFetchEventTimeLag",
+ (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+ metricGroup.gauge(
+ "currentEmitEventTimeLag",
+ (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+ metricGroup.gauge(
+ "sourceIdleTime", (Gauge<Long>) () ->
debeziumChangeFetcher.getIdleTime());
+ if (StringUtils.isNotEmpty(this.inlongMetric)) {
+ String[] inlongMetricArray =
inlongMetric.split(Constants.DELIMITER);
+ String groupId = inlongMetricArray[0];
+ String streamId = inlongMetricArray[1];
+ String nodeId = inlongMetricArray[2];
+ sourceMetricData = new SourceMetricData(groupId, streamId, nodeId,
metricGroup);
+ sourceMetricData.registerMetricsForNumRecordsIn();
+ sourceMetricData.registerMetricsForNumBytesIn();
+ sourceMetricData.registerMetricsForNumBytesInPerSecond();
+ sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+ }
+
properties.setProperty("name", "engine");
properties.setProperty("offset.storage",
FlinkOffsetBackingStore.class.getCanonicalName());
if (restoredOffsetState != null) {
@@ -415,7 +454,22 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
this.debeziumChangeFetcher =
new DebeziumChangeFetcher<>(
sourceContext,
- deserializer,
+ new DebeziumDeserializationSchema<T>() {
+ @Override
+ public void deserialize(SourceRecord record,
Collector<T> out) throws Exception {
+ if (sourceMetricData != null) {
+ sourceMetricData.getNumRecordsIn().inc(1L);
+ sourceMetricData.getNumBytesIn()
+
.inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ deserializer.deserialize(record, out);
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return deserializer.getProducedType();
+ }
+ },
restoredOffsetState == null, // DB snapshot phase if
restore state is null
dbzHeartbeatPrefix,
handover);
@@ -441,23 +495,6 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
executor.execute(engine);
debeziumStarted = true;
- // initialize metrics
- // make RuntimeContext#getMetricGroup compatible between Flink 1.13
and Flink 1.14
- final Method getMetricGroupMethod =
- getRuntimeContext().getClass().getMethod("getMetricGroup");
- getMetricGroupMethod.setAccessible(true);
- final MetricGroup metricGroup =
- (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
-
- metricGroup.gauge(
- "currentFetchEventTimeLag",
- (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
- metricGroup.gauge(
- "currentEmitEventTimeLag",
- (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
- metricGroup.gauge(
- "sourceIdleTime", (Gauge<Long>) () ->
debeziumChangeFetcher.getIdleTime());
-
// start the real debezium consumer
debeziumChangeFetcher.runFetchLoop();
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
index cfc1fecd9..4caf6be21 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
@@ -70,6 +70,7 @@ public class MySqlSource {
private Properties dbzProperties;
private StartupOptions startupOptions = StartupOptions.initial();
private DebeziumDeserializationSchema<T> deserializer;
+ private String inlongMetric;
public Builder<T> hostname(String hostname) {
this.hostname = hostname;
@@ -167,6 +168,11 @@ public class MySqlSource {
return this;
}
+ public Builder<T> inlongMetric(String inlongMetric) {
+ this.inlongMetric = inlongMetric;
+ return this;
+ }
+
/**
* builder
*/
@@ -261,7 +267,7 @@ public class MySqlSource {
}
return new DebeziumSourceFunction<>(
- deserializer, props, specificOffset, new
MySqlValidator(props));
+ deserializer, props, specificOffset, new
MySqlValidator(props), inlongMetric);
}
}
}
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index 11be54d47..7038a0e3a 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -285,6 +285,7 @@ public class MySqlTableSource implements ScanTableSource,
SupportsReadingMetadat
.serverTimeZone(serverTimeZone.toString())
.debeziumProperties(dbzProperties)
.startupOptions(startupOptions)
+ .inlongMetric(inlongMetric)
.deserializer(deserializer);
Optional.ofNullable(serverId)
.ifPresent(serverId ->
builder.serverId(Integer.parseInt(serverId)));