This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 962b52980d [INLONG-10610][Sort] Make MongoDB source support report 
audit information exactly once (#10622)
962b52980d is described below

commit 962b52980db085f047605b3b6fb25a8b09c9174b
Author: XiaoYou201 <[email protected]>
AuthorDate: Mon Jul 15 17:08:03 2024 +0800

    [INLONG-10610][Sort] Make MongoDB source support report audit information 
exactly once (#10622)
---
 .../sort/mongodb/DebeziumSourceFunction.java       | 10 +++++++
 .../MongoDBConnectorDeserializationSchema.java     | 33 +++++++++++++++++-----
 2 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
index daaaace1b0..2d7191525b 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
@@ -312,6 +312,10 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             snapshotOffsetState(functionSnapshotContext.getCheckpointId());
             snapshotHistoryRecordsState();
         }
+        if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
+            ((MongoDBConnectorDeserializationSchema) deserializer)
+                    
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+        }
     }
 
     private void snapshotOffsetState(long checkpointId) throws Exception {
@@ -486,6 +490,12 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             DebeziumOffset offset =
                     
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
             changeConsumer.commitOffset(offset);
+
+            if (deserializer instanceof MongoDBConnectorDeserializationSchema) 
{
+                MongoDBConnectorDeserializationSchema schema = 
(MongoDBConnectorDeserializationSchema) deserializer;
+                schema.flushAudit();
+                schema.updateLastCheckpointId(checkpointId);
+            }
         } catch (Exception e) {
             // ignore exception if we are no longer running
             LOG.warn("Ignore error when committing offset to database.", e);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
index d8e4770f79..daa8dccb79 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.mongodb;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 
 import com.mongodb.client.model.changestream.OperationType;
 import com.mongodb.internal.HexUtils;
@@ -112,7 +112,7 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
 
     private final MetricOption metricOption;
 
-    private SourceMetricData sourceMetricData;
+    private SourceExactlyMetric sourceExactlyMetric;
 
     public MongoDBConnectorDeserializationSchema(
             RowType physicalDataType,
@@ -131,7 +131,7 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
     @Override
     public void open() {
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption);
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption);
         }
     }
 
@@ -152,12 +152,13 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
                 GenericRowData insert = extractRowData(fullDocument);
                 insert.setRowKind(RowKind.INSERT);
                 emit(record, insert,
-                        sourceMetricData == null ? out : new 
MetricsCollector<>(out, sourceMetricData));
+                        sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
                 break;
             case DELETE:
                 GenericRowData delete = extractRowData(documentKey);
                 delete.setRowKind(RowKind.DELETE);
-                emit(record, delete, sourceMetricData == null ? out : new 
MetricsCollector<>(out, sourceMetricData));
+                emit(record, delete,
+                        sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
                 break;
             case UPDATE:
                 // It’s null if another operation deletes the document
@@ -168,13 +169,13 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
                 GenericRowData updateAfter = extractRowData(fullDocument);
                 updateAfter.setRowKind(RowKind.UPDATE_AFTER);
                 emit(record, updateAfter,
-                        sourceMetricData == null ? out : new 
MetricsCollector<>(out, sourceMetricData));
+                        sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
                 break;
             case REPLACE:
                 GenericRowData replaceAfter = extractRowData(fullDocument);
                 replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
                 emit(record, replaceAfter,
-                        sourceMetricData == null ? out : new 
MetricsCollector<>(out, sourceMetricData));
+                        sourceExactlyMetric == null ? out : new 
MetricsCollector<>(out, sourceExactlyMetric));
                 break;
             case INVALIDATE:
             case DROP:
@@ -808,4 +809,22 @@ public class MongoDBConnectorDeserializationSchema 
implements DebeziumDeserializ
             return converter.convert(docObj);
         };
     }
+
+    public void flushAudit() {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.flushAudit();
+        }
+    }
+
+    public void updateCurrentCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+
+    public void updateLastCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+        }
+    }
 }

Reply via email to