This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 962b52980d [INLONG-10610][Sort] Make MongoDB source support report
audit information exactly once (#10622)
962b52980d is described below
commit 962b52980db085f047605b3b6fb25a8b09c9174b
Author: XiaoYou201 <[email protected]>
AuthorDate: Mon Jul 15 17:08:03 2024 +0800
[INLONG-10610][Sort] Make MongoDB source support report audit information
exactly once (#10622)
---
.../sort/mongodb/DebeziumSourceFunction.java | 10 +++++++
.../MongoDBConnectorDeserializationSchema.java | 33 +++++++++++++++++-----
2 files changed, 36 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
index daaaace1b0..2d7191525b 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java
@@ -312,6 +312,10 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
+ if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
+ ((MongoDBConnectorDeserializationSchema) deserializer)
+
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+ }
}
private void snapshotOffsetState(long checkpointId) throws Exception {
@@ -486,6 +490,12 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
DebeziumOffset offset =
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
changeConsumer.commitOffset(offset);
+
+ if (deserializer instanceof MongoDBConnectorDeserializationSchema)
{
+ MongoDBConnectorDeserializationSchema schema =
(MongoDBConnectorDeserializationSchema) deserializer;
+ schema.flushAudit();
+ schema.updateLastCheckpointId(checkpointId);
+ }
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
index d8e4770f79..daa8dccb79 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.mongodb;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import com.mongodb.client.model.changestream.OperationType;
import com.mongodb.internal.HexUtils;
@@ -112,7 +112,7 @@ public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializ
private final MetricOption metricOption;
- private SourceMetricData sourceMetricData;
+ private SourceExactlyMetric sourceExactlyMetric;
public MongoDBConnectorDeserializationSchema(
RowType physicalDataType,
@@ -131,7 +131,7 @@ public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializ
@Override
public void open() {
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption);
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
}
@@ -152,12 +152,13 @@ public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializ
GenericRowData insert = extractRowData(fullDocument);
insert.setRowKind(RowKind.INSERT);
emit(record, insert,
- sourceMetricData == null ? out : new
MetricsCollector<>(out, sourceMetricData));
+ sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
break;
case DELETE:
GenericRowData delete = extractRowData(documentKey);
delete.setRowKind(RowKind.DELETE);
- emit(record, delete, sourceMetricData == null ? out : new
MetricsCollector<>(out, sourceMetricData));
+ emit(record, delete,
+ sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
break;
case UPDATE:
// It’s null if another operation deletes the document
@@ -168,13 +169,13 @@ public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializ
GenericRowData updateAfter = extractRowData(fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, updateAfter,
- sourceMetricData == null ? out : new
MetricsCollector<>(out, sourceMetricData));
+ sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
break;
case REPLACE:
GenericRowData replaceAfter = extractRowData(fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, replaceAfter,
- sourceMetricData == null ? out : new
MetricsCollector<>(out, sourceMetricData));
+ sourceExactlyMetric == null ? out : new
MetricsCollector<>(out, sourceExactlyMetric));
break;
case INVALIDATE:
case DROP:
@@ -808,4 +809,22 @@ public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializ
return converter.convert(docObj);
};
}
+
+ public void flushAudit() {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.flushAudit();
+ }
+ }
+
+ public void updateCurrentCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ public void updateLastCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+ }
+ }
}