This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 67676d8631 [INLONG-11355][Sort] Add new source metrics for 
sort-connector-mongodb-cdc-v1.15 (#11356)
67676d8631 is described below

commit 67676d863162206c0e52912ec64cc0d18a81e3f3
Author: PeterZh6 <[email protected]>
AuthorDate: Wed Oct 16 17:21:09 2024 +0800

    [INLONG-11355][Sort] Add new source metrics for 
sort-connector-mongodb-cdc-v1.15 (#11356)
---
 .../sort/mongodb/DebeziumSourceFunction.java       |  70 ++++++++++---
 .../MongoDBConnectorDeserializationSchema.java     | 110 ++++++++++++---------
 .../apache/inlong/sort/mongodb/MongoDBSource.java  |  16 ++-
 .../inlong/sort/mongodb/MongoDBTableSource.java    |   4 +-
 .../sort/mongodb/source/MongoDBSourceBuilder.java  |   8 ++
 5 files changed, 147 insertions(+), 61 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
index 2d7191525b..46541295e7 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
@@ -17,6 +17,9 @@
 
 package org.apache.inlong.sort.mongodb;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
+
 import com.ververica.cdc.debezium.Validator;
 import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
 import com.ververica.cdc.debezium.internal.DebeziumOffset;
@@ -61,6 +64,8 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.nio.charset.StandardCharsets;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -197,17 +202,25 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
     /** Buffer the events from the source and record the errors from the 
debezium. */
     private transient Handover handover;
 
+    private transient SourceExactlyMetric sourceExactlyMetric;
+
+    private final MetricOption metricOption;
+
+    private transient Map<Long, Long> checkpointStartTimeMap;
+
     // 
---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
             DebeziumDeserializationSchema<T> deserializer,
             Properties properties,
             @Nullable DebeziumOffset specificOffset,
-            Validator validator) {
+            Validator validator,
+            MetricOption metricOption) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
+        this.metricOption = metricOption;
     }
 
     @Override
@@ -220,6 +233,14 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
         this.executor = Executors.newSingleThreadExecutor(threadFactory);
         this.handover = new Handover();
         this.changeConsumer = new DebeziumChangeConsumer(handover);
+        if (metricOption != null) {
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
+        this.checkpointStartTimeMap = new HashMap<>();
+        // set sourceExactlyMetric for deserializer
+        if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
+            ((MongoDBConnectorDeserializationSchema) 
deserializer).setSourceExactlyMetric(sourceExactlyMetric);
+        }
     }
 
     // ------------------------------------------------------------------------
@@ -304,17 +325,32 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     @Override
     public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
-        if (handover.hasError()) {
-            LOG.debug("snapshotState() called on closed source");
-            throw new FlinkRuntimeException(
-                    "Call snapshotState() on closed source, checkpoint 
failed.");
-        } else {
-            snapshotOffsetState(functionSnapshotContext.getCheckpointId());
-            snapshotHistoryRecordsState();
-        }
-        if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
-            ((MongoDBConnectorDeserializationSchema) deserializer)
-                    
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+        try {
+            if (handover.hasError()) {
+                LOG.debug("snapshotState() called on closed source");
+                throw new FlinkRuntimeException(
+                        "Call snapshotState() on closed source, checkpoint 
failed.");
+            } else {
+                snapshotOffsetState(functionSnapshotContext.getCheckpointId());
+                snapshotHistoryRecordsState();
+            }
+            if (deserializer instanceof MongoDBConnectorDeserializationSchema) 
{
+                ((MongoDBConnectorDeserializationSchema) deserializer)
+                        
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+            }
+            if (checkpointStartTimeMap != null) {
+                
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), 
System.currentTimeMillis());
+            } else {
+                LOG.error("checkpointStartTimeMap is null, can't record the 
start time of checkpoint");
+            }
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumSnapshotCreate();;
+            }
+        } catch (Exception e) {
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumDeserializeError();
+            }
+            throw e;
         }
     }
 
@@ -496,6 +532,16 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 schema.flushAudit();
                 schema.updateLastCheckpointId(checkpointId);
             }
+            if (checkpointStartTimeMap != null) {
+                Long snapShotStartTimeById = 
checkpointStartTimeMap.remove(checkpointId);
+                if (snapShotStartTimeById != null && sourceExactlyMetric != 
null) {
+                    sourceExactlyMetric.incNumSnapshotComplete();
+                    sourceExactlyMetric
+                            
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - 
snapShotStartTimeById);
+                }
+            } else {
+                LOG.error("checkpointStartTimeMap is null, can't get the start 
time of checkpoint");
+            }
         } catch (Exception e) {
             // ignore exception if we are no longer running
             LOG.warn("Ignore error when committing offset to database.", e);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
index daa8dccb79..668f6de4cb 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
@@ -137,53 +137,66 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
 
     @Override
     public void deserialize(SourceRecord record, Collector<RowData> out) 
throws Exception {
-        Struct value = (Struct) record.value();
-        Schema valueSchema = record.valueSchema();
-
-        OperationType op = operationTypeFor(record);
-        BsonDocument documentKey =
-                checkNotNull(
-                        extractBsonDocument(
-                                value, valueSchema, 
MongoDBEnvelope.DOCUMENT_KEY_FIELD));
-        BsonDocument fullDocument =
-                extractBsonDocument(value, valueSchema, 
MongoDBEnvelope.FULL_DOCUMENT_FIELD);
-        switch (op) {
-            case INSERT:
-                GenericRowData insert = extractRowData(fullDocument);
-                insert.setRowKind(RowKind.INSERT);
-                emit(record, insert,
-                        sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
-                break;
-            case DELETE:
-                GenericRowData delete = extractRowData(documentKey);
-                delete.setRowKind(RowKind.DELETE);
-                emit(record, delete,
-                        sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
-                break;
-            case UPDATE:
-                // It’s null if another operation deletes the document
-                // before the lookup operation happens. Ignored it.
-                if (fullDocument == null) {
+        long deserializeStartTime = System.currentTimeMillis();
+        try {
+            Struct value = (Struct) record.value();
+            Schema valueSchema = record.valueSchema();
+
+            OperationType op = operationTypeFor(record);
+            BsonDocument documentKey =
+                    checkNotNull(
+                            extractBsonDocument(
+                                    value, valueSchema, 
MongoDBEnvelope.DOCUMENT_KEY_FIELD));
+            BsonDocument fullDocument =
+                    extractBsonDocument(value, valueSchema, 
MongoDBEnvelope.FULL_DOCUMENT_FIELD);
+            switch (op) {
+                case INSERT:
+                    GenericRowData insert = extractRowData(fullDocument);
+                    insert.setRowKind(RowKind.INSERT);
+                    emit(record, insert,
+                            sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
                     break;
-                }
-                GenericRowData updateAfter = extractRowData(fullDocument);
-                updateAfter.setRowKind(RowKind.UPDATE_AFTER);
-                emit(record, updateAfter,
-                        sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
-                break;
-            case REPLACE:
-                GenericRowData replaceAfter = extractRowData(fullDocument);
-                replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
-                emit(record, replaceAfter,
-                        sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
-                break;
-            case INVALIDATE:
-            case DROP:
-            case DROP_DATABASE:
-            case RENAME:
-            case OTHER:
-            default:
-                break;
+                case DELETE:
+                    GenericRowData delete = extractRowData(documentKey);
+                    delete.setRowKind(RowKind.DELETE);
+                    emit(record, delete,
+                            sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
+                    break;
+                case UPDATE:
+                    // It’s null if another operation deletes the document
+                    // before the lookup operation happens. Ignored it.
+                    if (fullDocument == null) {
+                        break;
+                    }
+                    GenericRowData updateAfter = extractRowData(fullDocument);
+                    updateAfter.setRowKind(RowKind.UPDATE_AFTER);
+                    emit(record, updateAfter,
+                            sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
+                    break;
+                case REPLACE:
+                    GenericRowData replaceAfter = extractRowData(fullDocument);
+                    replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
+                    emit(record, replaceAfter,
+                            sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
+                    break;
+                case INVALIDATE:
+                case DROP:
+                case DROP_DATABASE:
+                case RENAME:
+                case OTHER:
+                default:
+                    break;
+            }
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumDeserializeSuccess();
+                
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - 
deserializeStartTime);
+            }
+
+        } catch (Exception e) {
+            if (sourceExactlyMetric != null) {
+                sourceExactlyMetric.incNumDeserializeError();
+            }
+            throw e;
         }
     }
 
@@ -827,4 +840,9 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
             sourceExactlyMetric.updateLastCheckpointId(checkpointId);
         }
     }
+
+    /** setter for DebeziumSourceFunction to set SourceExactlyMetric*/
+    public void setSourceExactlyMetric(SourceExactlyMetric 
sourceExactlyMetric) {
+        this.sourceExactlyMetric = sourceExactlyMetric;
+    }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
index f9aab2d54f..67bb51bf69 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.mongodb;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
+
 import com.mongodb.client.model.changestream.FullDocument;
 import com.mongodb.kafka.connect.source.MongoSourceConfig;
 import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
@@ -35,7 +37,11 @@ import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSour
 import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
 import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
 import static 
com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
-import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.*;
+import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
+import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
+import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
+import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
+import static 
com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
 import static 
com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -76,6 +82,7 @@ public class MongoDBSource {
         private String copyExistingPipeline;
         private Integer heartbeatIntervalMillis = 
HEARTBEAT_INTERVAL_MILLIS.defaultValue();
         private DebeziumDeserializationSchema<T> deserializer;
+        private MetricOption metricOption;
 
         /** The comma-separated list of hostname and port pairs of mongodb 
servers. */
         public Builder<T> hosts(String hosts) {
@@ -243,6 +250,11 @@ public class MongoDBSource {
             return this;
         }
 
+        public Builder<T> metricOption(MetricOption metricOption) {
+            this.metricOption = metricOption;
+            return this;
+        }
+
         /**
          * The properties of mongodb kafka connector.
          * https://docs.mongodb.com/kafka-connector/current/kafka-source
@@ -338,7 +350,7 @@ public class MongoDBSource {
                     MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, 
ErrorTolerance.NONE.value());
 
             return new DebeziumSourceFunction<>(
-                    deserializer, props, null, 
Validator.getDefaultValidator());
+                    deserializer, props, null, 
Validator.getDefaultValidator(), metricOption);
         }
     }
 }
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
index 9c417b4edf..a161077b0a 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java
@@ -191,13 +191,15 @@ public class MongoDBTableSource implements 
ScanTableSource, SupportsReadingMetad
                     .ifPresent(builder::heartbeatIntervalMillis);
             
Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize);
             Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB);
+            Optional.ofNullable(metricOption).ifPresent(builder::metricOption);
 
             return SourceProvider.of(builder.build());
         } else {
             org.apache.inlong.sort.mongodb.MongoDBSource.Builder<RowData> 
builder =
                     
org.apache.inlong.sort.mongodb.MongoDBSource.<RowData>builder()
                             .hosts(hosts)
-                            .deserializer(deserializer);
+                            .deserializer(deserializer)
+                            .metricOption(metricOption);
 
             Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
             
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
index a95f238a0b..07b0d9f1cf 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.mongodb.source;
 
+import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;
 
 import com.ververica.cdc.connectors.base.options.StartupOptions;
@@ -54,6 +55,7 @@ public class MongoDBSourceBuilder<T> {
 
     private final MongoDBSourceConfigFactory configFactory = new 
MongoDBSourceConfigFactory();
     private DebeziumDeserializationSchema<T> deserializer;
+    private MetricOption metricOption;
 
     /** The comma-separated list of hostname and port pairs of mongodb 
servers. */
     public MongoDBSourceBuilder<T> hosts(String hosts) {
@@ -189,6 +191,12 @@ public class MongoDBSourceBuilder<T> {
         return this;
     }
 
+    /** The metric option used to collect metrics when inlong.metric.labels is 
present in flink sql. */
+    public MongoDBSourceBuilder<T> metricOption(MetricOption metricOption) {
+        this.metricOption = metricOption;
+        return this;
+    }
+
     /**
      * Build the {@link MongoDBSource}.
      *

Reply via email to