This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9606de07c [INLONG-7250][Sort] Output the read phase metrics for MySQL 
reader (#7251)
9606de07c is described below

commit 9606de07cb47195f459c4b5ed09cef40374c1ba2
Author: chestnufang <[email protected]>
AuthorDate: Tue Jan 17 00:12:30 2023 +0800

    [INLONG-7250][Sort] Output the read phase metrics for MySQL reader (#7251)
---
 .../cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java     | 10 ++++++++++
 .../sort/cdc/mysql/source/reader/MySqlRecordEmitter.java       |  5 +++++
 2 files changed, 15 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
index 2655ab7ac..a46076ef2 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/metrics/MySqlSourceReaderMetrics.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.enums.ReadPhase;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
@@ -137,4 +138,13 @@ public class MySqlSourceReaderMetrics {
     public SourceTableMetricData getSourceMetricData() {
         return sourceTableMetricData;
     }
+
+    /**
+     * output read phase metric
+     *
+     * @param readPhase the readPhase of record
+     */
+    public void outputReadPhaseMetrics(ReadPhase readPhase) {
+        sourceTableMetricData.outputReadPhaseMetrics(readPhase);
+    }
 }
diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
index eb961df00..e9427ec3c 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/reader/MySqlRecordEmitter.java
@@ -28,6 +28,7 @@ import 
io.debezium.relational.history.TableChanges.TableChange;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.connector.base.source.reader.RecordEmitter;
 import org.apache.flink.util.Collector;
+import org.apache.inlong.sort.base.enums.ReadPhase;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import 
org.apache.inlong.sort.cdc.base.debezium.history.FlinkJsonTableChangeSerializer;
 import 
org.apache.inlong.sort.cdc.mysql.source.metrics.MySqlSourceReaderMetrics;
@@ -171,6 +172,10 @@ public final class MySqlRecordEmitter<T>
 
     private void updateStartingOffsetForSplit(MySqlSplitState splitState, 
SourceRecord element) {
         if (splitState.isBinlogSplitState()) {
+            // record the time metric to enter the incremental phase
+            if (sourceReaderMetrics != null) {
+                
sourceReaderMetrics.outputReadPhaseMetrics(ReadPhase.INCREASE_PHASE);
+            }
             BinlogOffset position = getBinlogPosition(element);
             splitState.asBinlogSplitState().setStartingOffset(position);
         }

Reply via email to