This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4114517a2 [INLONG-5217][Sort] Add connector-base component for common
code (#5237)
4114517a2 is described below
commit 4114517a216b41a2d36b7f9242e55a2a54f5b975
Author: Oneal65 <[email protected]>
AuthorDate: Thu Jul 28 23:57:05 2022 +0800
[INLONG-5217][Sort] Add connector-base component for common code (#5237)
---
inlong-sort/sort-connectors/connector-base/pom.xml | 37 ++++++++++
.../org/apache/inlong/sort/base/Constants.java | 49 +++++++++++++
.../inlong/sort/base/metric/SinkMetricData.java} | 6 +-
.../inlong/sort/base/metric/SourceMetricData.java} | 15 ++--
inlong-sort/sort-connectors/hbase/pom.xml | 5 ++
.../inlong/sort/hbase/sink/HBaseSinkFunction.java | 83 ++++++++++++----------
inlong-sort/sort-connectors/pom.xml | 1 +
inlong-sort/sort-connectors/postgres-cdc/pom.xml | 5 ++
.../DebeziumSourceFunction.java | 51 +++++++------
9 files changed, 175 insertions(+), 77 deletions(-)
diff --git a/inlong-sort/sort-connectors/connector-base/pom.xml
b/inlong-sort/sort-connectors/connector-base/pom.xml
new file mode 100644
index 000000000..e26db29ff
--- /dev/null
+++ b/inlong-sort/sort-connectors/connector-base/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>sort-connectors</artifactId>
+ <groupId>org.apache.inlong</groupId>
+ <version>1.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>sort-connector-base</artifactId>
+ <name>Apache InLong - Sort-connector-base</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
new file mode 100644
index 000000000..c9c749ad6
--- /dev/null
+++
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base;
+
+/**
+ * connector base option constant
+ */
+public final class Constants {
+
+ /**
+ * constants for metrics
+ */
+ public static final String DIRTY_BYTES = "dirtyBytes";
+
+ public static final String DIRTY_RECORDS = "dirtyRecords";
+
+ public static final String NUM_BYTES_OUT = "numBytesOut";
+
+ public static final String NUM_RECORDS_OUT = "numRecordsOut";
+
+ public static final String NUM_BYTES_OUT_PER_SECOND =
"numBytesOutPerSecond";
+
+ public static final String NUM_RECORDS_OUT_PER_SECOND =
"numRecordsOutPerSecond";
+
+ public static final String NUM_RECORDS_IN = "numRecordsIn";
+
+ public static final String NUM_BYTES_IN = "numBytesIn";
+
+ public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
+
+ public static final String NUM_RECORDS_IN_PER_SECOND =
"numRecordsInPerSecond";
+
+}
diff --git
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
similarity index 96%
rename from
inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
rename to
inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 7bf05fad2..c3af8f027 100644
---
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/metric/MetricData.java
+++
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.hbase.metric;
+package org.apache.inlong.sort.base.metric;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
@@ -26,7 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
/**
* A collection class for handling metrics
*/
-public class MetricData {
+public class SinkMetricData {
private final MetricGroup metricGroup;
@@ -41,7 +41,7 @@ public class MetricData {
private static String GROUP_ID = "groupId";
private static String NODE_ID = "nodeId";
- public MetricData(MetricGroup metricGroup) {
+ public SinkMetricData(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
similarity index 96%
rename from
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
rename to
inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 07b6f9044..8a065836f 100644
---
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/MetricData.java
+++
b/inlong-sort/sort-connectors/connector-base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.inlong.sort.cdc.postgres;
+package org.apache.inlong.sort.base.metric;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
@@ -26,20 +26,19 @@ import org.apache.flink.metrics.MetricGroup;
/**
* A collection class for handling metrics
*/
-public class MetricData {
+public class SourceMetricData {
+ private static Integer TIME_SPAN_IN_SECONDS = 60;
+ private static String STREAM_ID = "streamId";
+ private static String GROUP_ID = "groupId";
+ private static String NODE_ID = "nodeId";
private final MetricGroup metricGroup;
-
private Counter numRecordsIn;
private Counter numBytesIn;
private Meter numRecordsInPerSecond;
private Meter numBytesInPerSecond;
- private static Integer TIME_SPAN_IN_SECONDS = 60;
- private static String STREAM_ID = "streamId";
- private static String GROUP_ID = "groupId";
- private static String NODE_ID = "nodeId";
- public MetricData(MetricGroup metricGroup) {
+ public SourceMetricData(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
diff --git a/inlong-sort/sort-connectors/hbase/pom.xml
b/inlong-sort/sort-connectors/hbase/pom.xml
index eeffccb40..de2f00703 100644
--- a/inlong-sort/sort-connectors/hbase/pom.xml
+++ b/inlong-sort/sort-connectors/hbase/pom.xml
@@ -36,6 +36,11 @@
</properties>
<dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_${flink.scala.binary.version}</artifactId>
diff --git
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index 3309e5544..10d92db09 100644
---
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.inlong.sort.hbase.metric.MetricData;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +50,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
+
/**
* The sink function for HBase.
*
@@ -73,30 +80,27 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
private final long bufferFlushIntervalMillis;
private final HBaseMutationConverter<T> mutationConverter;
private final String inLongMetric;
-
+ /**
+ * This is set from inside the {@link BufferedMutator.ExceptionListener}
if a {@link Throwable}
+ * was thrown.
+ *
+ * <p>
+ * Errors will be checked and rethrown before processing each input
element, and when the
+ * sink is closed.
+ * </p>
+ */
+ private final AtomicReference<Throwable> failureThrowable = new
AtomicReference<>();
private transient Connection connection;
private transient BufferedMutator mutator;
-
private transient ScheduledExecutorService executor;
private transient ScheduledFuture scheduledFuture;
private transient AtomicLong numPendingRequests;
private transient RuntimeContext runtimeContext;
-
private transient volatile boolean closed = false;
-
- private MetricData metricData;
+ private SinkMetricData sinkMetricData;
private Long dataSize = 0L;
private Long rowSize = 0L;
- /**
- * This is set from inside the {@link BufferedMutator.ExceptionListener}
if a {@link Throwable}
- * was thrown.
- *
- * <p>Errors will be checked and rethrown before processing each input
element, and when the
- * sink is closed.
- */
- private final AtomicReference<Throwable> failureThrowable = new
AtomicReference<>();
-
public HBaseSinkFunction(
String hTableName,
org.apache.hadoop.conf.Configuration conf,
@@ -121,19 +125,20 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
org.apache.hadoop.conf.Configuration config =
prepareRuntimeConfiguration();
try {
this.runtimeContext = getRuntimeContext();
- metricData = new MetricData(runtimeContext.getMetricGroup());
+ sinkMetricData = new
SinkMetricData(runtimeContext.getMetricGroup());
if (inLongMetric != null && !inLongMetric.isEmpty()) {
String[] inLongMetricArray = inLongMetric.split("_");
String groupId = inLongMetricArray[0];
String streamId = inLongMetricArray[1];
String nodeId = inLongMetricArray[2];
- metricData.registerMetricsForDirtyBytes(groupId, streamId,
nodeId, "dirtyBytes");
- metricData.registerMetricsForDirtyRecords(groupId, streamId,
nodeId, "dirtyRecords");
- metricData.registerMetricsForNumBytesOut(groupId, streamId,
nodeId, "numBytesOut");
- metricData.registerMetricsForNumRecordsOut(groupId, streamId,
nodeId, "numRecordsOut");
- metricData.registerMetricsForNumBytesOutPerSecond(groupId,
streamId, nodeId, "numBytesOutPerSecond");
- metricData.registerMetricsForNumRecordsOutPerSecond(groupId,
streamId, nodeId,
- "numRecordsOutPerSecond");
+ sinkMetricData.registerMetricsForDirtyBytes(groupId, streamId,
nodeId, DIRTY_BYTES);
+ sinkMetricData.registerMetricsForDirtyRecords(groupId,
streamId, nodeId, DIRTY_RECORDS);
+ sinkMetricData.registerMetricsForNumBytesOut(groupId,
streamId, nodeId, NUM_BYTES_OUT);
+ sinkMetricData.registerMetricsForNumRecordsOut(groupId,
streamId, nodeId, NUM_RECORDS_OUT);
+ sinkMetricData.registerMetricsForNumBytesOutPerSecond(groupId,
streamId, nodeId,
+ NUM_BYTES_OUT_PER_SECOND);
+
sinkMetricData.registerMetricsForNumRecordsOutPerSecond(groupId, streamId,
nodeId,
+ NUM_RECORDS_OUT_PER_SECOND);
}
this.mutationConverter.open();
this.numPendingRequests = new AtomicLong(0);
@@ -161,20 +166,20 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
}
try {
flush();
- if (metricData.getNumRecordsOut() !=
null) {
-
metricData.getNumRecordsOut().inc(rowSize);
+ if (sinkMetricData.getNumRecordsOut()
!= null) {
+
sinkMetricData.getNumRecordsOut().inc(rowSize);
}
- if (metricData.getNumRecordsOut() !=
null) {
- metricData.getNumBytesOut()
+ if (sinkMetricData.getNumBytesOut() !=
null) {
+ sinkMetricData.getNumBytesOut()
.inc(dataSize);
}
resetStateAfterFlush();
} catch (Exception e) {
- if (metricData.getDirtyRecords() !=
null) {
-
metricData.getDirtyRecords().inc(rowSize);
+ if (sinkMetricData.getDirtyRecords()
!= null) {
+
sinkMetricData.getDirtyRecords().inc(rowSize);
}
- if (metricData.getDirtyBytes() !=
null) {
-
metricData.getDirtyBytes().inc(dataSize);
+ if (sinkMetricData.getDirtyBytes() !=
null) {
+
sinkMetricData.getDirtyBytes().inc(dataSize);
}
resetStateAfterFlush();
// fail the sink and skip the rest of
the items
@@ -240,20 +245,20 @@ public class HBaseSinkFunction<T> extends
RichSinkFunction<T>
&& numPendingRequests.incrementAndGet() >=
bufferFlushMaxMutations) {
try {
flush();
- if (metricData.getNumRecordsOut() != null) {
- metricData.getNumRecordsOut().inc(rowSize);
+ if (sinkMetricData.getNumRecordsOut() != null) {
+ sinkMetricData.getNumRecordsOut().inc(rowSize);
}
- if (metricData.getNumRecordsOut() != null) {
- metricData.getNumBytesOut()
+ if (sinkMetricData.getNumBytesOut() != null) {
+ sinkMetricData.getNumBytesOut()
.inc(dataSize);
}
resetStateAfterFlush();
} catch (Exception e) {
- if (metricData.getDirtyRecords() != null) {
- metricData.getDirtyRecords().inc(rowSize);
+ if (sinkMetricData.getDirtyRecords() != null) {
+ sinkMetricData.getDirtyRecords().inc(rowSize);
}
- if (metricData.getDirtyBytes() != null) {
- metricData.getDirtyBytes().inc(dataSize);
+ if (sinkMetricData.getDirtyBytes() != null) {
+ sinkMetricData.getDirtyBytes().inc(dataSize);
}
resetStateAfterFlush();
throw e;
diff --git a/inlong-sort/sort-connectors/pom.xml
b/inlong-sort/sort-connectors/pom.xml
index 5879a6dd6..fa6f26b80 100644
--- a/inlong-sort/sort-connectors/pom.xml
+++ b/inlong-sort/sort-connectors/pom.xml
@@ -35,6 +35,7 @@
<packaging>pom</packaging>
<modules>
+ <module>connector-base</module>
<module>hive</module>
<module>mysql-cdc</module>
<module>kafka</module>
diff --git a/inlong-sort/sort-connectors/postgres-cdc/pom.xml
b/inlong-sort/sort-connectors/postgres-cdc/pom.xml
index b862bd305..12b2ceae8 100644
--- a/inlong-sort/sort-connectors/postgres-cdc/pom.xml
+++ b/inlong-sort/sort-connectors/postgres-cdc/pom.xml
@@ -41,6 +41,11 @@
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connector-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
index b2952d1d3..157cdb3e6 100644
---
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
@@ -58,6 +58,7 @@ import
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +78,10 @@ import java.util.concurrent.TimeUnit;
import static
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND;
/**
* The {@link DebeziumSourceFunction} is a streaming data source that pulls
captured change data
@@ -110,40 +115,33 @@ import static
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHisto
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener,
ResultTypeQueryable<T> {
- private static final long serialVersionUID = -5808108641062931623L;
-
- protected static final Logger LOG =
LoggerFactory.getLogger(DebeziumSourceFunction.class);
-
/**
* State name of the consumer's partition offset states.
*/
public static final String OFFSETS_STATE_NAME = "offset-states";
-
/**
* State name of the consumer's history records state.
*/
public static final String HISTORY_RECORDS_STATE_NAME =
"history-records-states";
-
/**
* The maximum number of pending non-committed checkpoints to track, to
avoid memory leaks.
*/
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
-
/**
* The configuration represents the Debezium MySQL Connector uses the
legacy implementation or
* not.
*/
public static final String LEGACY_IMPLEMENTATION_KEY =
"internal.implementation";
-
/**
* The configuration value represents legacy implementation.
*/
public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy";
+ protected static final Logger LOG =
LoggerFactory.getLogger(DebeziumSourceFunction.class);
+ private static final long serialVersionUID = -5808108641062931623L;
//
---------------------------------------------------------------------------------------
// Properties
//
---------------------------------------------------------------------------------------
-
/**
* The schema to convert from Debezium's messages into Flink's objects.
*/
@@ -164,21 +162,18 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
* Data for pending but uncommitted offsets.
*/
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
-
- /**
- * Flag indicating whether the Debezium Engine is started.
- */
- private volatile boolean debeziumStarted = false;
-
/**
* Validator to validate the connected database satisfies the cdc
connector's requirements.
*/
private final Validator validator;
+ /**
+ * Flag indicating whether the Debezium Engine is started.
+ */
+ private volatile boolean debeziumStarted = false;
//
---------------------------------------------------------------------------------------
// State
//
---------------------------------------------------------------------------------------
-
/**
* The offsets to restore to, if the consumer restores state from a
checkpoint.
*
@@ -231,7 +226,7 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
private String inlongMetric;
- private MetricData metricData;
+ private SourceMetricData sourceMetricData;
//
---------------------------------------------------------------------------------------
@@ -423,11 +418,13 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
String groupId = inlongMetricArray[0];
String streamId = inlongMetricArray[1];
String nodeId = inlongMetricArray[2];
- metricData = new MetricData(metricGroup);
- metricData.registerMetricsForNumRecordsIn(groupId, streamId,
nodeId, "numRecordsIn");
- metricData.registerMetricsForNumBytesIn(groupId, streamId, nodeId,
"numBytesIn");
- metricData.registerMetricsForNumBytesInPerSecond(groupId,
streamId, nodeId, "numBytesInPerSecond");
- metricData.registerMetricsForNumRecordsInPerSecond(groupId,
streamId, nodeId, "numRecordsInPerSecond");
+ sourceMetricData = new SourceMetricData(metricGroup);
+ sourceMetricData.registerMetricsForNumRecordsIn(groupId, streamId,
nodeId, NUM_RECORDS_IN);
+ sourceMetricData.registerMetricsForNumBytesIn(groupId, streamId,
nodeId, NUM_BYTES_IN);
+ sourceMetricData.registerMetricsForNumBytesInPerSecond(groupId,
streamId,
+ nodeId, NUM_BYTES_IN_PER_SECOND);
+ sourceMetricData.registerMetricsForNumRecordsInPerSecond(groupId,
streamId,
+ nodeId, NUM_RECORDS_IN_PER_SECOND);
}
properties.setProperty("name", "engine");
properties.setProperty("offset.storage",
FlinkOffsetBackingStore.class.getCanonicalName());
@@ -466,9 +463,9 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
new DebeziumDeserializationSchema<T>() {
@Override
public void deserialize(SourceRecord record,
Collector<T> out) throws Exception {
- if (metricData != null) {
- metricData.getNumRecordsIn().inc(1L);
- metricData.getNumBytesIn()
+ if (sourceMetricData != null) {
+ sourceMetricData.getNumRecordsIn().inc(1L);
+ sourceMetricData.getNumBytesIn()
.inc(record.value().toString().getBytes(StandardCharsets.UTF_8).length);
}
deserializer.deserialize(record, out);
@@ -641,7 +638,7 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
return engineInstanceName;
}
- public MetricData getMetricData() {
- return metricData;
+ public SourceMetricData getMetricData() {
+ return sourceMetricData;
}
}