This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ade959ab83 [INLONG-10630][Sort] Make SQL Server source support report 
audit information exactly once (#10631)
ade959ab83 is described below

commit ade959ab835abf6bd5b6a67bcf93e86b307c9460
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 16 12:40:00 2024 +0800

    [INLONG-10630][Sort] Make SQL Server source support report audit 
information exactly once (#10631)
---
 .../sort/sqlserver/DebeziumSourceFunction.java     | 10 +++++++
 .../RowDataDebeziumDeserializeSchema.java          | 32 +++++++++++++++++-----
 2 files changed, 35 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
index 7962b6f4cd..01118d6513 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
@@ -314,6 +314,10 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             snapshotOffsetState(functionSnapshotContext.getCheckpointId());
             snapshotHistoryRecordsState();
         }
+        if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+            ((RowDataDebeziumDeserializeSchema) deserializer)
+                    
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+        }
     }
 
     private void snapshotOffsetState(long checkpointId) throws Exception {
@@ -488,6 +492,12 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             DebeziumOffset offset =
                     
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
             changeConsumer.commitOffset(offset);
+
+            if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+                RowDataDebeziumDeserializeSchema schema = 
(RowDataDebeziumDeserializeSchema) deserializer;
+                schema.flushAudit();
+                schema.updateLastCheckpointId(checkpointId);
+            }
         } catch (Exception e) {
             // ignore exception if we are no longer running
             LOG.warn("Ignore error when committing offset to database.", e);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
index 2e632c1cb2..d90f470513 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.sqlserver;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 
 import com.ververica.cdc.debezium.table.AppendMetadataCollector;
 import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
@@ -101,7 +101,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
     /** Changelog Mode to use for encoding changes in Flink internal data 
structure. */
     private final DebeziumChangelogMode changelogMode;
     private final MetricOption metricOption;
-    private SourceMetricData sourceMetricData;
+    private SourceExactlyMetric sourceExactlyMetric;
 
     /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
     public static Builder newBuilder() {
@@ -133,7 +133,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
     @Override
     public void open() {
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption);
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption);
         }
     }
 
@@ -146,8 +146,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData insert = extractAfterRow(value, valueSchema);
             validator.validate(insert, RowKind.INSERT);
             insert.setRowKind(RowKind.INSERT);
-            if (sourceMetricData != null) {
-                out = new MetricsCollector<>(out, sourceMetricData);
+            if (sourceExactlyMetric != null) {
+                out = new MetricsCollector<>(out, sourceExactlyMetric);
             }
             emit(record, insert, out);
         } else if (op == Envelope.Operation.DELETE) {
@@ -166,8 +166,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData after = extractAfterRow(value, valueSchema);
             validator.validate(after, RowKind.UPDATE_AFTER);
             after.setRowKind(RowKind.UPDATE_AFTER);
-            if (sourceMetricData != null) {
-                out = new MetricsCollector<>(out, sourceMetricData);
+            if (sourceExactlyMetric != null) {
+                out = new MetricsCollector<>(out, sourceExactlyMetric);
             }
             emit(record, after, out);
         }
@@ -679,4 +679,22 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             }
         };
     }
+
+    public void flushAudit() {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.flushAudit();
+        }
+    }
+
+    public void updateCurrentCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+
+    public void updateLastCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+        }
+    }
 }

Reply via email to