This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new 46f436c36 [INLONG-6471][Sort] MySQL connector metric restore lost init
data for using sourceFunction (#6474)
46f436c36 is described below
commit 46f436c36ed2255cc3b0ae8da230cc33f2264513
Author: Xin Gong <[email protected]>
AuthorDate: Wed Nov 9 10:35:53 2022 +0800
[INLONG-6471][Sort] MySQL connector metric restore lost init data for using
sourceFunction (#6474)
---
.../org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
index 80d6812f2..5ade85306 100644
---
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/debezium/DebeziumSourceFunction.java
@@ -80,6 +80,8 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
import static
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
import static
org.apache.inlong.sort.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
@@ -438,6 +440,8 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
MetricOption metricOption = MetricOption.builder()
.withInlongLabels(inlongMetric)
.withInlongAudit(inlongAudit)
+ .withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+ .withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_IN) : 0L)
.withRegisterMetric(RegisteredMetric.ALL)
.build();
if (metricOption != null) {