This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4114517a2 [INLONG-5217][Sort] Add connector-base component for common 
code (#5237)
4114517a2 is described below

commit 4114517a216b41a2d36b7f9242e55a2a54f5b975
Author: Oneal65 <[email protected]>
AuthorDate: Thu Jul 28 23:57:05 2022 +0800

    [INLONG-5217][Sort] Add connector-base component for common code (#5237)
---
 inlong-sort/sort-connectors/connector-base/pom.xml | 37 ++++++++++
 .../org/apache/inlong/sort/base/Constants.java     | 49 +++++++++++++
 .../inlong/sort/base/metric/SinkMetricData.java}   |  6 +-
 .../inlong/sort/base/metric/SourceMetricData.java} | 15 ++--
 inlong-sort/sort-connectors/hbase/pom.xml          |  5 ++
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  | 83 ++++++++++++----------
 inlong-sort/sort-connectors/pom.xml                |  1 +
 inlong-sort/sort-connectors/postgres-cdc/pom.xml   |  5 ++
 .../DebeziumSourceFunction.java                    | 51 +++++++------
 9 files changed, 175 insertions(+), 77 deletions(-)

diff --git a/inlong-sort/sort-connectors/connector-base/pom.xml 
b/inlong-sort/sort-connectors/connector-base/pom.xml
new file mode 100644
index 000000000..e26db29ff
--- /dev/null
+++ b/inlong-sort/sort-connectors/connector-base/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xmlns="http://maven.apache.org/POM/4.0.0";
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>sort-connectors</artifactId>
+        <groupId>org.apache.inlong</groupId>
+        <version>1.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>sort-connector-base</artifactId>
+    <name>Apache InLong - Sort-connector-base</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
new file mode 100644
index 000000000..c9c749ad6
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -0,0 +1,49 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base;
+
+/**
+ * connector base option constant
+ */
+public final class Constants {
+
+    /**
+     * constants for metrics
+     */
+    public static final String DIRTY_BYTES = "dirtyBytes";
+
+    public static final String DIRTY_RECORDS = "dirtyRecords";
+
+    public static final String NUM_BYTES_OUT = "numBytesOut";
+
+    public static final String NUM_RECORDS_OUT = "numRecordsOut";
+
+    public static final String NUM_BYTES_OUT_PER_SECOND = 
"numBytesOutPerSecond";
+
+    public static final String NUM_RECORDS_OUT_PER_SECOND = 
"numRecordsOutPerSecond";
+
+    public static final String NUM_RECORDS_IN = "numRecordsIn";
+
+    public static final String NUM_BYTES_IN = "numBytesIn";
+
+    public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
+
+    public static final String NUM_RECORDS_IN_PER_SECOND = 
"numRecordsInPerSecond";
+
+}
diff --git 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
 
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
similarity index 96%
rename from 
inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
rename to 
inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 7bf05fad2..c3af8f027 100644
--- 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
+++ 
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -16,7 +16,7 @@
  *   limitations under the License.
  */
 
-package org.apache.inlong.sort.hbase.metric;
+package org.apache.inlong.sort.base.metric;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
@@ -26,7 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
 /**
  * A collection class for handling metrics
  */
-public class MetricData {
+public class SinkMetricData {
 
     private final MetricGroup metricGroup;
 
@@ -41,7 +41,7 @@ public class MetricData {
     private static String GROUP_ID = "groupId";
     private static String NODE_ID = "nodeId";
 
-    public MetricData(MetricGroup metricGroup) {
+    public SinkMetricData(MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
     }
 
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
 
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
similarity index 96%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
rename to 
inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 07b6f9044..8a065836f 100644
--- 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
+++ 
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -16,7 +16,7 @@
  *   limitations under the License.
  */
 
-package org.apache.inlong.sort.cdc.postgres;
+package org.apache.inlong.sort.base.metric;
 
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Meter;
@@ -26,20 +26,19 @@ import org.apache.flink.metrics.MetricGroup;
 /**
  * A collection class for handling metrics
  */
-public class MetricData {
+public class SourceMetricData {
 
+    private static Integer TIME_SPAN_IN_SECONDS = 60;
+    private static String STREAM_ID = "streamId";
+    private static String GROUP_ID = "groupId";
+    private static String NODE_ID = "nodeId";
     private final MetricGroup metricGroup;
-
     private Counter numRecordsIn;
     private Counter numBytesIn;
     private Meter numRecordsInPerSecond;
     private Meter numBytesInPerSecond;
-    private static Integer TIME_SPAN_IN_SECONDS = 60;
-    private static String STREAM_ID = "streamId";
-    private static String GROUP_ID = "groupId";
-    private static String NODE_ID = "nodeId";
 
-    public MetricData(MetricGroup metricGroup) {
+    public SourceMetricData(MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
     }
 
diff --git a/inlong-sort/sort-connectors/hbase/pom.xml 
b/inlong-sort/sort-connectors/hbase/pom.xml
index eeffccb40..de2f00703 100644
--- a/inlong-sort/sort-connectors/hbase/pom.xml
+++ b/inlong-sort/sort-connectors/hbase/pom.xml
@@ -36,6 +36,11 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
diff --git 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 3309e5544..10d92db09 100644
--- 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.inlong.sort.hbase.metric.MetricData;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,6 +50,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
+
 /**
  * The sink function for HBase.
  *
@@ -73,30 +80,27 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
     private final long bufferFlushIntervalMillis;
     private final HBaseMutationConverter<T> mutationConverter;
     private final String inLongMetric;
-
+    /**
+     * This is set from inside the {@link BufferedMutator.ExceptionListener} 
if a {@link Throwable}
+     * was thrown.
+     *
+     * <p>
+     * Errors will be checked and rethrown before processing each input 
element, and when the
+     * sink is closed.
+     * </p>
+     */
+    private final AtomicReference<Throwable> failureThrowable = new 
AtomicReference<>();
     private transient Connection connection;
     private transient BufferedMutator mutator;
-
     private transient ScheduledExecutorService executor;
     private transient ScheduledFuture scheduledFuture;
     private transient AtomicLong numPendingRequests;
     private transient RuntimeContext runtimeContext;
-
     private transient volatile boolean closed = false;
-
-    private MetricData metricData;
+    private SinkMetricData sinkMetricData;
     private Long dataSize = 0L;
     private Long rowSize = 0L;
 
-    /**
-     * This is set from inside the {@link BufferedMutator.ExceptionListener} 
if a {@link Throwable}
-     * was thrown.
-     *
-     * <p>Errors will be checked and rethrown before processing each input 
element, and when the
-     * sink is closed.
-     */
-    private final AtomicReference<Throwable> failureThrowable = new 
AtomicReference<>();
-
     public HBaseSinkFunction(
             String hTableName,
             org.apache.hadoop.conf.Configuration conf,
@@ -121,19 +125,20 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
         org.apache.hadoop.conf.Configuration config = 
prepareRuntimeConfiguration();
         try {
             this.runtimeContext = getRuntimeContext();
-            metricData = new MetricData(runtimeContext.getMetricGroup());
+            sinkMetricData = new 
SinkMetricData(runtimeContext.getMetricGroup());
             if (inLongMetric != null && !inLongMetric.isEmpty()) {
                 String[] inLongMetricArray = inLongMetric.split("_");
                 String groupId = inLongMetricArray[0];
                 String streamId = inLongMetricArray[1];
                 String nodeId = inLongMetricArray[2];
-                metricData.registerMetricsForDirtyBytes(groupId, streamId, 
nodeId, "dirtyBytes");
-                metricData.registerMetricsForDirtyRecords(groupId, streamId, 
nodeId, "dirtyRecords");
-                metricData.registerMetricsForNumBytesOut(groupId, streamId, 
nodeId, "numBytesOut");
-                metricData.registerMetricsForNumRecordsOut(groupId, streamId, 
nodeId, "numRecordsOut");
-                metricData.registerMetricsForNumBytesOutPerSecond(groupId, 
streamId, nodeId, "numBytesOutPerSecond");
-                metricData.registerMetricsForNumRecordsOutPerSecond(groupId, 
streamId, nodeId,
-                        "numRecordsOutPerSecond");
+                sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId, 
nodeId, DIRTY_BYTES);
+                sinkMetricData.registerMetricsForDirtyRecords(groupId, 
streamId, nodeId, DIRTY_RECORDS);
+                sinkMetricData.registerMetricsForNumBytesOut(groupId, 
streamId, nodeId, NUM_BYTES_OUT);
+                sinkMetricData.registerMetricsForNumRecordsOut(groupId, 
streamId, nodeId, NUM_RECORDS_OUT);
+                sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId, 
streamId, nodeId,
+                        NUM_BYTES_OUT_PER_SECOND);
+                
sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId, 
nodeId,
+                        NUM_RECORDS_OUT_PER_SECOND);
             }
             this.mutationConverter.open();
             this.numPendingRequests = new AtomicLong(0);
@@ -161,20 +166,20 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
                                     }
                                     try {
                                         flush();
-                                        if (metricData.getNumRecordsOut() != 
null) {
-                                            
metricData.getNumRecordsOut().inc(rowSize);
+                                        if (sinkMetricData.getNumRecordsOut() 
!= null) {
+                                            
sinkMetricData.getNumRecordsOut().inc(rowSize);
                                         }
-                                        if (metricData.getNumRecordsOut() != 
null) {
-                                            metricData.getNumBytesOut()
+                                        if (sinkMetricData.getNumBytesOut() != 
null) {
+                                            sinkMetricData.getNumBytesOut()
                                                     .inc(dataSize);
                                         }
                                         resetStateAfterFlush();
                                     } catch (Exception e) {
-                                        if (metricData.getDirtyRecords() != 
null) {
-                                            
metricData.getDirtyRecords().inc(rowSize);
+                                        if (sinkMetricData.getDirtyRecords() 
!= null) {
+                                            
sinkMetricData.getDirtyRecords().inc(rowSize);
                                         }
-                                        if (metricData.getDirtyBytes() != 
null) {
-                                            
metricData.getDirtyBytes().inc(dataSize);
+                                        if (sinkMetricData.getDirtyBytes() != 
null) {
+                                            
sinkMetricData.getDirtyBytes().inc(dataSize);
                                         }
                                         resetStateAfterFlush();
                                         // fail the sink and skip the rest of 
the items
@@ -240,20 +245,20 @@ public class HBaseSinkFunction<T> extends 
RichSinkFunction<T>
                 && numPendingRequests.incrementAndGet() >= 
bufferFlushMaxMutations) {
             try {
                 flush();
-                if (metricData.getNumRecordsOut() != null) {
-                    metricData.getNumRecordsOut().inc(rowSize);
+                if (sinkMetricData.getNumRecordsOut() != null) {
+                    sinkMetricData.getNumRecordsOut().inc(rowSize);
                 }
-                if (metricData.getNumRecordsOut() != null) {
-                    metricData.getNumBytesOut()
+                if (sinkMetricData.getNumBytesOut() != null) {
+                    sinkMetricData.getNumBytesOut()
                             .inc(dataSize);
                 }
                 resetStateAfterFlush();
             } catch (Exception e) {
-                if (metricData.getDirtyRecords() != null) {
-                    metricData.getDirtyRecords().inc(rowSize);
+                if (sinkMetricData.getDirtyRecords() != null) {
+                    sinkMetricData.getDirtyRecords().inc(rowSize);
                 }
-                if (metricData.getDirtyBytes() != null) {
-                    metricData.getDirtyBytes().inc(dataSize);
+                if (sinkMetricData.getDirtyBytes() != null) {
+                    sinkMetricData.getDirtyBytes().inc(dataSize);
                 }
                 resetStateAfterFlush();
                 throw e;
diff --git a/inlong-sort/sort-connectors/pom.xml 
b/inlong-sort/sort-connectors/pom.xml
index 5879a6dd6..fa6f26b80 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -35,6 +35,7 @@
     <packaging>pom</packaging>
 
     <modules>
+        <module>connector-base</module>
         <module>hive</module>
         <module>mysql-cdc</module>
         <module>kafka</module>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/pom.xml 
b/inlong-sort/sort-connectors/postgres-cdc/pom.xml
index b862bd305..12b2ceae8 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/postgres-cdc/pom.xml
@@ -41,6 +41,11 @@
             <groupId>com.ververica</groupId>
             <artifactId>flink-connector-postgres-cdc</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index b2952d1d3..157cdb3e6 100644
--- 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -58,6 +58,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +78,10 @@ import java.util.concurrent.TimeUnit;
 
 import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls 
captured change data
@@ -110,40 +115,33 @@ import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHisto
 public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
         implements CheckpointedFunction, CheckpointListener, 
ResultTypeQueryable<T> {
 
-    private static final long serialVersionUID = -5808108641062931623L;
-
-    protected static final Logger LOG = 
LoggerFactory.getLogger(DebeziumSourceFunction.class);
-
     /**
      * State name of the consumer's partition offset states.
      */
     public static final String OFFSETS_STATE_NAME = "offset-states";
-
     /**
      * State name of the consumer's history records state.
      */
     public static final String HISTORY_RECORDS_STATE_NAME = 
"history-records-states";
-
     /**
      * The maximum number of pending non-committed checkpoints to track, to 
avoid memory leaks.
      */
     public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
     /**
      * The configuration represents the Debezium MySQL Connector uses the 
legacy implementation or
      * not.
      */
     public static final String LEGACY_IMPLEMENTATION_KEY = 
"internal.implementation";
-
     /**
      * The configuration value represents legacy implementation.
      */
     public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DebeziumSourceFunction.class);
+    private static final long serialVersionUID = -5808108641062931623L;
 
     // 
---------------------------------------------------------------------------------------
     // Properties
     // 
---------------------------------------------------------------------------------------
-
     /**
      * The schema to convert from Debezium's messages into Flink's objects.
      */
@@ -164,21 +162,18 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
      * Data for pending but uncommitted offsets.
      */
     private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
-    /**
-     * Flag indicating whether the Debezium Engine is started.
-     */
-    private volatile boolean debeziumStarted = false;
-
     /**
      * Validator to validate the connected database satisfies the cdc 
connector's requirements.
      */
     private final Validator validator;
+    /**
+     * Flag indicating whether the Debezium Engine is started.
+     */
+    private volatile boolean debeziumStarted = false;
 
     // 
---------------------------------------------------------------------------------------
     // State
     // 
---------------------------------------------------------------------------------------
-
     /**
      * The offsets to restore to, if the consumer restores state from a 
checkpoint.
      *
@@ -231,7 +226,7 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     private String inlongMetric;
 
-    private MetricData metricData;
+    private SourceMetricData sourceMetricData;
 
     // 
---------------------------------------------------------------------------------------
 
@@ -423,11 +418,13 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             String groupId = inlongMetricArray[0];
             String streamId = inlongMetricArray[1];
             String nodeId = inlongMetricArray[2];
-            metricData = new MetricData(metricGroup);
-            metricData.registerMetricsForNumRecordsIn(groupId, streamId, 
nodeId, "numRecordsIn");
-            metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId, 
"numBytesIn");
-            metricData.registerMetricsForNumBytesInPerSecond(groupId, 
streamId, nodeId, "numBytesInPerSecond");
-            metricData.registerMetricsForNumRecordsInPerSecond(groupId, 
streamId, nodeId, "numRecordsInPerSecond");
+            sourceMetricData = new SourceMetricData(metricGroup);
+            sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId, 
nodeId, NUM_RECORDS_IN);
+            sourceMetricData.registerMetricsForNumBytesIn(groupId, streamId, 
nodeId, NUM_BYTES_IN);
+            sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId, 
streamId,
+                    nodeId, NUM_BYTES_IN_PER_SECOND);
+            sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId, 
streamId,
+                    nodeId, NUM_RECORDS_IN_PER_SECOND);
         }
         properties.setProperty("name", "engine");
         properties.setProperty("offset.storage", 
FlinkOffsetBackingStore.class.getCanonicalName());
@@ -466,9 +463,9 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                         new DebeziumDeserializationSchema<T>() {
                             @Override
                             public void deserialize(SourceRecord record, 
Collector<T> out) throws Exception {
-                                if (metricData != null) {
-                                    metricData.getNumRecordsIn().inc(1L);
-                                    metricData.getNumBytesIn()
+                                if (sourceMetricData != null) {
+                                    sourceMetricData.getNumRecordsIn().inc(1L);
+                                    sourceMetricData.getNumBytesIn()
                                             
.inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
                                 }
                                 deserializer.deserialize(record, out);
@@ -641,7 +638,7 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
         return engineInstanceName;
     }
 
-    public MetricData getMetricData() {
-        return metricData;
+    public SourceMetricData getMetricData() {
+        return sourceMetricData;
     }
 }

Reply via email to