This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a41888bab [INLONG-5478][Manager] Add MySQL connector inlong metric 
feature for no primary key table (#5487)
a41888bab is described below

commit a41888bab7e1caad2dc2860e8651458c2c6cf544
Author: Xin Gong <[email protected]>
AuthorDate: Thu Aug 11 17:23:05 2022 +0800

    [INLONG-5478][Manager] Add MySQL connector inlong metric feature for no 
primary key table (#5487)
---
 inlong-sort/sort-connectors/mysql-cdc/pom.xml      |  6 ++
 .../sort/cdc/debezium/DebeziumSourceFunction.java  | 75 ++++++++++++++++------
 .../apache/inlong/sort/cdc/mysql/MySqlSource.java  |  8 ++-
 .../sort/cdc/mysql/table/MySqlTableSource.java     |  1 +
 4 files changed, 70 insertions(+), 20 deletions(-)

diff --git a/inlong-sort/sort-connectors/mysql-cdc/pom.xml 
b/inlong-sort/sort-connectors/mysql-cdc/pom.xml
index fd4edb09e..c29f2dcb9 100644
--- a/inlong-sort/sort-connectors/mysql-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/mysql-cdc/pom.xml
@@ -42,6 +42,11 @@
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>sort-format-json</artifactId>
@@ -65,6 +70,7 @@
                         <configuration>
                             <artifactSet>
                                 <includes>
+                                    <include>org.apache.inlong:*</include>
                                     <include>io.debezium:debezium-api</include>
                                     
<include>io.debezium:debezium-embedded</include>
                                     
<include>io.debezium:debezium-core</include>
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index a7991252a..882775d47 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -25,6 +25,7 @@ import io.debezium.engine.DebeziumEngine;
 import io.debezium.engine.spi.OffsetCommitPolicy;
 import io.debezium.heartbeat.Heartbeat;
 import org.apache.commons.collections.map.LinkedMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.CheckpointListener;
@@ -43,8 +44,11 @@ import 
org.apache.flink.runtime.state.FunctionSnapshotContext;
 import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeConsumer;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumChangeFetcher;
 import org.apache.inlong.sort.cdc.debezium.internal.DebeziumOffset;
@@ -54,6 +58,7 @@ import 
org.apache.inlong.sort.cdc.debezium.internal.FlinkDatabaseSchemaHistory;
 import org.apache.inlong.sort.cdc.debezium.internal.FlinkOffsetBackingStore;
 import org.apache.inlong.sort.cdc.debezium.internal.Handover;
 import org.apache.inlong.sort.cdc.debezium.internal.SchemaRecord;
+import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -214,17 +219,23 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
      */
     private transient Handover handover;
 
+    private String inlongMetric;
+
+    private SourceMetricData sourceMetricData;
+
     // 
---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
             DebeziumDeserializationSchema<T> deserializer,
             Properties properties,
             @Nullable DebeziumOffset specificOffset,
-            Validator validator) {
+            Validator validator,
+            String inlongMetric) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
+        this.inlongMetric = inlongMetric;
     }
 
     @Override
@@ -381,6 +392,34 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     @Override
     public void run(SourceContext<T> sourceContext) throws Exception {
+        // initialize metrics
+        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 
and Flink 1.14
+        final Method getMetricGroupMethod =
+                getRuntimeContext().getClass().getMethod("getMetricGroup");
+        getMetricGroupMethod.setAccessible(true);
+        final MetricGroup metricGroup =
+                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
+
+        metricGroup.gauge(
+                "currentFetchEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
+        metricGroup.gauge(
+                "currentEmitEventTimeLag",
+                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
+        metricGroup.gauge(
+                "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
+        if (StringUtils.isNotEmpty(this.inlongMetric)) {
+            String[] inlongMetricArray = 
inlongMetric.split(Constants.DELIMITER);
+            String groupId = inlongMetricArray[0];
+            String streamId = inlongMetricArray[1];
+            String nodeId = inlongMetricArray[2];
+            sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, 
metricGroup);
+            sourceMetricData.registerMetricsForNumRecordsIn();
+            sourceMetricData.registerMetricsForNumBytesIn();
+            sourceMetricData.registerMetricsForNumBytesInPerSecond();
+            sourceMetricData.registerMetricsForNumRecordsInPerSecond();
+        }
+
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", 
FlinkOffsetBackingStore.class.getCanonicalName());
         if (restoredOffsetState != null) {
@@ -415,7 +454,22 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
         this.debeziumChangeFetcher =
                 new DebeziumChangeFetcher<>(
                         sourceContext,
-                        deserializer,
+                        new DebeziumDeserializationSchema<T>() {
+                            @Override
+                            public void deserialize(SourceRecord record, 
Collector<T> out) throws Exception {
+                                if (sourceMetricData != null) {
+                                    sourceMetricData.getNumRecordsIn().inc(1L);
+                                    sourceMetricData.getNumBytesIn()
+                                            
.inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
+                                }
+                                deserializer.deserialize(record, out);
+                            }
+
+                            @Override
+                            public TypeInformation<T> getProducedType() {
+                                return deserializer.getProducedType();
+                            }
+                        },
                         restoredOffsetState == null, // DB snapshot phase if 
restore state is null
                         dbzHeartbeatPrefix,
                         handover);
@@ -441,23 +495,6 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
         executor.execute(engine);
         debeziumStarted = true;
 
-        // initialize metrics
-        // make RuntimeContext#getMetricGroup compatible between Flink 1.13 
and Flink 1.14
-        final Method getMetricGroupMethod =
-                getRuntimeContext().getClass().getMethod("getMetricGroup");
-        getMetricGroupMethod.setAccessible(true);
-        final MetricGroup metricGroup =
-                (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext());
-
-        metricGroup.gauge(
-                "currentFetchEventTimeLag",
-                (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay());
-        metricGroup.gauge(
-                "currentEmitEventTimeLag",
-                (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay());
-        metricGroup.gauge(
-                "sourceIdleTime", (Gauge<Long>) () -> 
debeziumChangeFetcher.getIdleTime());
-
         // start the real debezium consumer
         debeziumChangeFetcher.runFetchLoop();
     }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
index cfc1fecd9..4caf6be21 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/MySqlSource.java
@@ -70,6 +70,7 @@ public class MySqlSource {
         private Properties dbzProperties;
         private StartupOptions startupOptions = StartupOptions.initial();
         private DebeziumDeserializationSchema<T> deserializer;
+        private String inlongMetric;
 
         public Builder<T> hostname(String hostname) {
             this.hostname = hostname;
@@ -167,6 +168,11 @@ public class MySqlSource {
             return this;
         }
 
+        public Builder<T> inlongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
+            return this;
+        }
+
         /**
          * builder
          */
@@ -261,7 +267,7 @@ public class MySqlSource {
             }
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, specificOffset, new 
MySqlValidator(props));
+                    deserializer, props, specificOffset, new 
MySqlValidator(props), inlongMetric);
         }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
index 11be54d47..7038a0e3a 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlTableSource.java
@@ -285,6 +285,7 @@ public class MySqlTableSource implements ScanTableSource, 
SupportsReadingMetadat
                             .serverTimeZone(serverTimeZone.toString())
                             .debeziumProperties(dbzProperties)
                             .startupOptions(startupOptions)
+                            .inlongMetric(inlongMetric)
                             .deserializer(deserializer);
             Optional.ofNullable(serverId)
                     .ifPresent(serverId -> 
builder.serverId(Integer.parseInt(serverId)));

Reply via email to