This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3e96b0332 [INLONG-7567][Sort] Extract metrics as common parameters
(#7568)
3e96b0332 is described below
commit 3e96b0332e59bb3bf8a1e8d5da4e1125260a65f9
Author: emhui <[email protected]>
AuthorDate: Mon Mar 13 11:57:44 2023 +0800
[INLONG-7567][Sort] Extract metrics as common parameters (#7568)
---
.../sort/cdc/base/config/BaseSourceConfig.java | 25 ++++++++++++--
.../sort/cdc/base/config/JdbcSourceConfig.java | 16 ++-------
.../inlong/sort/cdc/base/config/MetricConfig.java | 39 ++++++++++++++++++++++
.../sort/cdc/base/source/IncrementalSource.java | 8 ++---
.../sort/cdc/base/source/IncrementalSource.java | 8 ++---
5 files changed, 73 insertions(+), 23 deletions(-)
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/BaseSourceConfig.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/BaseSourceConfig.java
index d38b5c21e..4505a1047 100644
---
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/BaseSourceConfig.java
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/BaseSourceConfig.java
@@ -25,7 +25,7 @@ import java.util.Properties;
/** A basic Source configuration which is used by {@link IncrementalSource}.
* Copy from com.ververica:flink-cdc-base:2.3.0.
* */
-public abstract class BaseSourceConfig implements SourceConfig {
+public abstract class BaseSourceConfig implements SourceConfig, MetricConfig {
private static final long serialVersionUID = 1L;
@@ -42,6 +42,12 @@ public abstract class BaseSourceConfig implements
SourceConfig {
protected final Properties dbzProperties;
protected transient Configuration dbzConfiguration;
+ //
--------------------------------------------------------------------------------------------
+ // Metric Configurations
+ //
--------------------------------------------------------------------------------------------
+ protected final String inlongMetric;
+ protected final String inlongAudit;
+
public BaseSourceConfig(
StartupOptions startupOptions,
int splitSize,
@@ -50,7 +56,9 @@ public abstract class BaseSourceConfig implements
SourceConfig {
double distributionFactorLower,
boolean includeSchemaChanges,
Properties dbzProperties,
- Configuration dbzConfiguration) {
+ Configuration dbzConfiguration,
+ String inlongMetric,
+ String inlongAudit) {
this.startupOptions = startupOptions;
this.splitSize = splitSize;
this.splitMetaGroupSize = splitMetaGroupSize;
@@ -59,6 +67,8 @@ public abstract class BaseSourceConfig implements
SourceConfig {
this.includeSchemaChanges = includeSchemaChanges;
this.dbzProperties = dbzProperties;
this.dbzConfiguration = dbzConfiguration;
+ this.inlongMetric = inlongMetric;
+ this.inlongAudit = inlongAudit;
}
@Override
@@ -96,4 +106,15 @@ public abstract class BaseSourceConfig implements
SourceConfig {
public Configuration getDbzConfiguration() {
return Configuration.from(dbzProperties);
}
+
+ @Override
+ public String getInlongMetric() {
+ return inlongMetric;
+ }
+
+ @Override
+ public String getInlongAudit() {
+ return inlongAudit;
+ }
+
}
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/JdbcSourceConfig.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/JdbcSourceConfig.java
index 2827b1f6f..8a6436a26 100644
---
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/JdbcSourceConfig.java
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/JdbcSourceConfig.java
@@ -45,9 +45,6 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
protected final int connectionPoolSize;
protected final String chunkKeyColumn;
- protected final String inlongMetric;
- protected final String inlongAudit;
-
public JdbcSourceConfig(
StartupOptions startupOptions,
List<String> databaseList,
@@ -80,7 +77,9 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
distributionFactorLower,
includeSchemaChanges,
dbzProperties,
- dbzConfiguration);
+ dbzConfiguration,
+ inlongMetric,
+ inlongAudit);
this.driverClassName = driverClassName;
this.hostname = hostname;
this.port = port;
@@ -95,8 +94,6 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
this.connectionPoolSize = connectionPoolSize;
this.chunkKeyColumn = chunkKeyColumn;
- this.inlongMetric = inlongMetric;
- this.inlongAudit = inlongAudit;
}
public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();
@@ -153,11 +150,4 @@ public abstract class JdbcSourceConfig extends
BaseSourceConfig {
return chunkKeyColumn;
}
- public String getInlongMetric() {
- return inlongMetric;
- }
-
- public String getInlongAudit() {
- return inlongAudit;
- }
}
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/MetricConfig.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/MetricConfig.java
new file mode 100644
index 000000000..33a48af4a
--- /dev/null
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/config/MetricConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.base.config;
+
+import java.io.Serializable;
+
+/** The mertic configuration which offers basic metric configuration. **/
+public interface MetricConfig extends Serializable {
+
+ /**
+ * getInlongMetric
+ *
+ * @return a label of inlong metric
+ */
+ String getInlongMetric();
+
+ /**
+ * getInlongAudit
+ *
+ * @return an address of inlong audit
+ */
+ String getInlongAudit();
+
+}
diff --git
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
index de3bec22e..8278cc7cf 100644
---
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+++
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -38,7 +38,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.cdc.base.config.JdbcSourceConfig;
+import org.apache.inlong.sort.cdc.base.config.MetricConfig;
import org.apache.inlong.sort.cdc.base.config.SourceConfig;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.dialect.DataSourceDialect;
@@ -109,7 +109,7 @@ public class IncrementalSource<T, C extends SourceConfig>
throws Exception {
// create source config for the given subtask (e.g. unique server id)
C sourceConfig =
configFactory.create(readerContext.getIndexOfSubtask());
- JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) sourceConfig;
+ MetricConfig metricConfig = (MetricConfig) sourceConfig;
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>>
elementsQueue =
new FutureCompletingBlockingQueue<>();
@@ -121,8 +121,8 @@ public class IncrementalSource<T, C extends SourceConfig>
// create source config for the given subtask (e.g. unique server id)
MetricOption metricOption = MetricOption.builder()
- .withInlongLabels(jdbcSourceConfig.getInlongMetric())
- .withAuditAddress(jdbcSourceConfig.getInlongAudit())
+ .withInlongLabels(metricConfig.getInlongMetric())
+ .withAuditAddress(metricConfig.getInlongAudit())
.withRegisterMetric(RegisteredMetric.ALL)
.build();
diff --git
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
index 1e6cd1245..30650ced5 100644
---
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
+++
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/base/source/IncrementalSource.java
@@ -40,7 +40,7 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
-import org.apache.inlong.sort.cdc.base.config.JdbcSourceConfig;
+import org.apache.inlong.sort.cdc.base.config.MetricConfig;
import org.apache.inlong.sort.cdc.base.config.SourceConfig;
import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
import org.apache.inlong.sort.cdc.base.dialect.DataSourceDialect;
@@ -111,7 +111,7 @@ public class IncrementalSource<T, C extends SourceConfig>
throws Exception {
// create source config for the given subtask (e.g. unique server id)
C sourceConfig =
configFactory.create(readerContext.getIndexOfSubtask());
- JdbcSourceConfig jdbcSourceConfig = (JdbcSourceConfig) sourceConfig;
+ MetricConfig metricConfig = (MetricConfig) sourceConfig;
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>>
elementsQueue =
new FutureCompletingBlockingQueue<>();
@@ -123,8 +123,8 @@ public class IncrementalSource<T, C extends SourceConfig>
// create source config for the given subtask (e.g. unique server id)
MetricOption metricOption = MetricOption.builder()
- .withInlongLabels(jdbcSourceConfig.getInlongMetric())
- .withAuditAddress(jdbcSourceConfig.getInlongAudit())
+ .withInlongLabels(metricConfig.getInlongMetric())
+ .withAuditAddress(metricConfig.getInlongAudit())
.withRegisterMetric(RegisteredMetric.ALL)
.build();