This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 727475ff8 [INLONG-7595][Sort] Mongo read phase metrics need to update 
when no incremental data (#7606)
727475ff8 is described below

commit 727475ff8a36f4b04b3eff694ddf9d6dc74f2c49
Author: Xin Gong <[email protected]>
AuthorDate: Wed Mar 15 14:32:27 2023 +0800

    [INLONG-7595][Sort] Mongo read phase metrics need to update when no 
incremental data (#7606)
---
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   | 32 ++++++++++++++++------
 .../debezium/DebeziumDeserializationSchema.java    |  8 +++++-
 .../debezium/internal/DebeziumChangeFetcher.java   | 23 +++++++++-------
 .../MongoDBConnectorDeserializationSchema.java     |  5 ++++
 4 files changed, 49 insertions(+), 19 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 1a763a16c..b835b584c 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -19,8 +19,10 @@ package org.apache.inlong.sort.cdc.mongodb;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
+import com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
 import com.ververica.cdc.debezium.Validator;
-import io.debezium.connector.SnapshotRecord;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
 import io.debezium.document.DocumentReader;
 import io.debezium.document.DocumentWriter;
 import io.debezium.embedded.Connect;
@@ -50,6 +52,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.enums.ReadPhase;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
@@ -501,17 +504,30 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
                             @Override
                             public void deserialize(SourceRecord record, 
Collector<T> out) throws Exception {
+                                // do nothing
+                            }
+
+                            @Override
+                            public void deserialize(SourceRecord record, 
Collector<T> out, Boolean isStreamingPhase)
+                                    throws Exception {
+                                if (record != null && 
MongoRecordUtils.isHeartbeatEvent(record)) {
+                                    if (sourceMetricData != null && 
isStreamingPhase) {
+                                        
sourceMetricData.outputReadPhaseMetrics(ReadPhase.INCREASE_PHASE);
+                                    }
+                                    return;
+                                }
                                 if (sourceMetricData != null && record != null 
&& migrateAll) {
                                     Struct value = (Struct) record.value();
-                                    Struct source = 
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
-                                    if (null == source) {
-                                        source = 
value.getStruct(RecordUtils.DOCUMENT_TO_FIELD);
+                                    Struct ns = 
value.getStruct(MongoDBEnvelope.NAMESPACE_FIELD);
+                                    if (null == ns) {
+                                        ns = 
value.getStruct(RecordUtils.DOCUMENT_TO_FIELD);
                                     }
-                                    String dbName = 
source.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
+                                    String dbName = 
ns.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);
                                     String collectionName =
-                                            
source.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
-                                    SnapshotRecord snapshotRecord = 
SnapshotRecord.fromSource(source);
-                                    boolean isSnapshotRecord = 
(SnapshotRecord.TRUE == snapshotRecord);
+                                            
ns.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD);
+                                    Struct source = 
value.getStruct(Envelope.FieldName.SOURCE);
+                                    String snapshotRecord = 
source.getString(AbstractSourceInfo.SNAPSHOT_KEY);
+                                    boolean isSnapshotRecord = 
Boolean.parseBoolean(snapshotRecord);
                                     sourceMetricData
                                             .outputMetricsWithEstimate(new 
String[]{dbName, collectionName},
                                                     isSnapshotRecord, value);
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumDeserializationSchema.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumDeserializationSchema.java
index be0b8a527..e60d65640 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/DebeziumDeserializationSchema.java
@@ -17,12 +17,13 @@
 
 package org.apache.inlong.sort.cdc.mongodb.debezium;
 
-import java.io.Serializable;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.util.Collector;
 import org.apache.kafka.connect.source.SourceRecord;
 
+import java.io.Serializable;
+
 /**
  * The deserialization schema describes how to turn the Debezium SourceRecord 
into data types
  * (Java/Scala objects) that are processed by Flink.
@@ -37,4 +38,9 @@ public interface DebeziumDeserializationSchema<T> extends 
Serializable, ResultTy
      */
     void deserialize(SourceRecord record, Collector<T> out) throws Exception;
 
+    /**
+     * Deserialize the Debezium record, it is represented in Kafka {@link 
SourceRecord}.
+     */
+    void deserialize(SourceRecord record, Collector<T> out, Boolean 
isStreamingPhase) throws Exception;
+
 }
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/DebeziumChangeFetcher.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/DebeziumChangeFetcher.java
index 3ffed20ec..3bbefa064 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/DebeziumChangeFetcher.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/internal/DebeziumChangeFetcher.java
@@ -21,10 +21,6 @@ import io.debezium.connector.SnapshotRecord;
 import io.debezium.data.Envelope;
 import io.debezium.engine.ChangeEvent;
 import io.debezium.engine.DebeziumEngine;
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -36,6 +32,11 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
 /**
  * A Handler that convert change messages from {@link DebeziumEngine} to data 
in Flink. Considering
  * Debezium in different mode has different strategies to hold the lock, e.g. 
snapshot, the handler
@@ -150,9 +151,9 @@ public class DebeziumChangeFetcher<T> {
                 synchronized (checkpointLock) {
                     LOG.info(
                             "Database snapshot phase can't perform checkpoint, 
acquired Checkpoint lock.");
-                    handleBatch(events);
+                    handleBatch(events, false);
                     while (isRunning && isInDbSnapshotPhase) {
-                        handleBatch(handover.pollNext());
+                        handleBatch(handover.pollNext(), false);
                     }
                 }
                 LOG.info("Received record from streaming binlog phase, 
released checkpoint lock.");
@@ -162,7 +163,7 @@ public class DebeziumChangeFetcher<T> {
             while (isRunning) {
                 // If the handover is closed or has errors, exit.
                 // If there is no streaming phase, the handover will be closed 
by the engine.
-                handleBatch(handover.pollNext());
+                handleBatch(handover.pollNext(), true);
             }
         } catch (Handover.ClosedException e) {
             // ignore
@@ -206,7 +207,7 @@ public class DebeziumChangeFetcher<T> {
     // Helper
     // 
---------------------------------------------------------------------------------------
 
-    private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> 
changeEvents)
+    private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> 
changeEvents, boolean isStreamingPhase)
             throws Exception {
         if (CollectionUtils.isEmpty(changeEvents)) {
             return;
@@ -224,11 +225,13 @@ public class DebeziumChangeFetcher<T> {
                     
debeziumOffset.setSourcePartition(record.sourcePartition());
                     debeziumOffset.setSourceOffset(record.sourceOffset());
                 }
+            }
+            deserialization.deserialize(record, debeziumCollector, 
isStreamingPhase);
+
+            if (isHeartbeatEvent(record)) {
                 // drop heartbeat events
                 continue;
             }
-            deserialization.deserialize(record, debeziumCollector);
-
             if (!isSnapshotRecord(record)) {
                 LOG.debug("Snapshot phase finishes.");
                 isInDbSnapshotPhase = false;
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
index dc1f71430..3ef453545 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/debezium/table/MongoDBConnectorDeserializationSchema.java
@@ -222,6 +222,11 @@ public class MongoDBConnectorDeserializationSchema
         }
     }
 
+    @Override
+    public void deserialize(SourceRecord record, Collector<RowData> out, 
Boolean isStreamingPhase) throws Exception {
+        this.deserialize(record, out);
+    }
+
     private GenericRowData extractMongoDMLData(Struct value, String keyFiled, 
String ddlType) {
         Struct documentTo = (Struct) value.get(keyFiled);
         String newDb = 
documentTo.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD);

Reply via email to