This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e96b0332 [INLONG-7567][Sort] Extract metrics as common parameters 
(#7568)
3e96b0332 is described below

commit 3e96b0332e59bb3bf8a1e8d5da4e1125260a65f9
Author: emhui <[email protected]>
AuthorDate: Mon Mar 13 11:57:44 2023 +0800

    [INLONG-7567][Sort] Extract metrics as common parameters (#7568)
---
 .../sort/cdc/base/config/BaseSourceConfig.java     | 25 ++++++++++++--
 .../sort/cdc/base/config/JdbcSourceConfig.java     | 16 ++-------
 .../inlong/sort/cdc/base/config/MetricConfig.java  | 39 ++++++++++++++++++++++
 .../sort/cdc/base/source/IncrementalSource.java    |  8 ++---
 .../sort/cdc/base/source/IncrementalSource.java    |  8 ++---
 5 files changed, 73 insertions(+), 23 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/BaseSourceConfig.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/BaseSourceConfig.java
index d38b5c21e..4505a1047 100644
--- 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/BaseSourceConfig.java
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/BaseSourceConfig.java
@@ -25,7 +25,7 @@ import java.util.Properties;
 /** A basic Source configuration which is used by {@link IncrementalSource}.
  * Copy from com.ververica:flink-cdc-base:2.3.0.
  * */
-public abstract class BaseSourceConfig implements SourceConfig {
+public abstract class BaseSourceConfig implements SourceConfig, MetricConfig {
 
     private static final long serialVersionUID = 1L;
 
@@ -42,6 +42,12 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
     protected final Properties dbzProperties;
     protected transient Configuration dbzConfiguration;
 
+    // 
--------------------------------------------------------------------------------------------
+    // Metric Configurations
+    // 
--------------------------------------------------------------------------------------------
+    protected final String inlongMetric;
+    protected final String inlongAudit;
+
     public BaseSourceConfig(
             StartupOptions startupOptions,
             int splitSize,
@@ -50,7 +56,9 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
             double distributionFactorLower,
             boolean includeSchemaChanges,
             Properties dbzProperties,
-            Configuration dbzConfiguration) {
+            Configuration dbzConfiguration,
+            String inlongMetric,
+            String inlongAudit) {
         this.startupOptions = startupOptions;
         this.splitSize = splitSize;
         this.splitMetaGroupSize = splitMetaGroupSize;
@@ -59,6 +67,8 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
         this.includeSchemaChanges = includeSchemaChanges;
         this.dbzProperties = dbzProperties;
         this.dbzConfiguration = dbzConfiguration;
+        this.inlongMetric = inlongMetric;
+        this.inlongAudit = inlongAudit;
     }
 
     @Override
@@ -96,4 +106,15 @@ public abstract class BaseSourceConfig implements 
SourceConfig {
     public Configuration getDbzConfiguration() {
         return Configuration.from(dbzProperties);
     }
+
+    @Override
+    public String getInlongMetric() {
+        return inlongMetric;
+    }
+
+    @Override
+    public String getInlongAudit() {
+        return inlongAudit;
+    }
+
 }
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/JdbcSourceConfig.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/JdbcSourceConfig.java
index 2827b1f6f..8a6436a26 100644
--- 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/JdbcSourceConfig.java
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/JdbcSourceConfig.java
@@ -45,9 +45,6 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
     protected final int connectionPoolSize;
     protected final String chunkKeyColumn;
 
-    protected final String inlongMetric;
-    protected final String inlongAudit;
-
     public JdbcSourceConfig(
             StartupOptions startupOptions,
             List<String> databaseList,
@@ -80,7 +77,9 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
                 distributionFactorLower,
                 includeSchemaChanges,
                 dbzProperties,
-                dbzConfiguration);
+                dbzConfiguration,
+                inlongMetric,
+                inlongAudit);
         this.driverClassName = driverClassName;
         this.hostname = hostname;
         this.port = port;
@@ -95,8 +94,6 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
         this.connectionPoolSize = connectionPoolSize;
         this.chunkKeyColumn = chunkKeyColumn;
 
-        this.inlongMetric = inlongMetric;
-        this.inlongAudit = inlongAudit;
     }
 
     public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();
@@ -153,11 +150,4 @@ public abstract class JdbcSourceConfig extends 
BaseSourceConfig {
         return chunkKeyColumn;
     }
 
-    public String getInlongMetric() {
-        return inlongMetric;
-    }
-
-    public String getInlongAudit() {
-        return inlongAudit;
-    }
 }
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/MetricConfig.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/MetricConfig.java
new file mode 100644
index 000000000..33a48af4a
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/MetricConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.config;
+
+import java.io.Serializable;
+
+/** The mertic configuration which offers basic metric configuration. **/
+public interface MetricConfig extends Serializable {
+
+    /**
+     * getInlongMetric
+     *
+     * @return a label of inlong metric
+     */
+    String getInlongMetric();
+
+    /**
+     * getInlongAudit
+     *
+     * @return an address of inlong audit
+     */
+    String getInlongAudit();
+
+}
diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
index de3bec22e..8278cc7cf 100644
--- 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -38,7 +38,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.cdc.base.config.JdbcSourceConfig;
+import org.apache.inlong.sort.cdc.base.config.MetricConfig;
 import org.apache.inlong.sort.cdc.base.config.SourceConfig;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.base.dialect.DataSourceDialect;
@@ -109,7 +109,7 @@ public class IncrementalSource<T, C extends SourceConfig>
             throws Exception {
         // create source config for the given subtask (e.g. unique server id)
         C sourceConfig = 
configFactory.create(readerContext.getIndexOfSubtask());
-        JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) sourceConfig;
+        MetricConfig metricConfig = (MetricConfig) sourceConfig;
         FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> 
elementsQueue =
                 new FutureCompletingBlockingQueue<>();
 
@@ -121,8 +121,8 @@ public class IncrementalSource<T, C extends SourceConfig>
 
         // create source config for the given subtask (e.g. unique server id)
         MetricOption metricOption = MetricOption.builder()
-                .withInlongLabels(jdbcSourceConfig.getInlongMetric())
-                .withAuditAddress(jdbcSourceConfig.getInlongAudit())
+                .withInlongLabels(metricConfig.getInlongMetric())
+                .withAuditAddress(metricConfig.getInlongAudit())
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
 
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
index 1e6cd1245..30650ced5 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -40,7 +40,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.cdc.base.config.JdbcSourceConfig;
+import org.apache.inlong.sort.cdc.base.config.MetricConfig;
 import org.apache.inlong.sort.cdc.base.config.SourceConfig;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.cdc.base.dialect.DataSourceDialect;
@@ -111,7 +111,7 @@ public class IncrementalSource<T, C extends SourceConfig>
             throws Exception {
         // create source config for the given subtask (e.g. unique server id)
         C sourceConfig = 
configFactory.create(readerContext.getIndexOfSubtask());
-        JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) sourceConfig;
+        MetricConfig metricConfig = (MetricConfig) sourceConfig;
         FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> 
elementsQueue =
                 new FutureCompletingBlockingQueue<>();
 
@@ -123,8 +123,8 @@ public class IncrementalSource<T, C extends SourceConfig>
 
         // create source config for the given subtask (e.g. unique server id)
         MetricOption metricOption = MetricOption.builder()
-                .withInlongLabels(jdbcSourceConfig.getInlongMetric())
-                .withAuditAddress(jdbcSourceConfig.getInlongAudit())
+                .withInlongLabels(metricConfig.getInlongMetric())
+                .withAuditAddress(metricConfig.getInlongAudit())
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
 

Reply via email to