This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d99973c4f5 [INLONG-10609][Sort] Make PostgreSQL source support report 
audit information exactly once (#10627)
d99973c4f5 is described below

commit d99973c4f5385ab9813893076165ec63cf61c338
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 16 12:40:43 2024 +0800

    [INLONG-10609][Sort] Make PostgreSQL source support report audit 
information exactly once (#10627)
---
 .../sort/postgre/DebeziumSourceFunction.java       |  9 ++++++
 .../postgre/RowDataDebeziumDeserializeSchema.java  | 36 ++++++++++++++++------
 2 files changed, 36 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
index 27b9988dd0..5efc6c6ea5 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
@@ -313,6 +313,10 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             snapshotOffsetState(functionSnapshotContext.getCheckpointId());
             snapshotHistoryRecordsState();
         }
+        if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+            ((RowDataDebeziumDeserializeSchema) deserializer)
+                    
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+        }
     }
 
     private void snapshotOffsetState(long checkpointId) throws Exception {
@@ -487,6 +491,11 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             DebeziumOffset offset =
                     
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
             changeConsumer.commitOffset(offset);
+            if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+                RowDataDebeziumDeserializeSchema schema = 
(RowDataDebeziumDeserializeSchema) deserializer;
+                schema.flushAudit();
+                schema.updateLastCheckpointId(checkpointId);
+            }
         } catch (Exception e) {
             // ignore exception if we are no longer running
             LOG.warn("Ignore error when committing offset to database.", e);
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
index de701a2ae1..fdf2d01327 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.postgre;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 
 import com.ververica.cdc.debezium.table.AppendMetadataCollector;
 import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
@@ -101,7 +101,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
     /** Changelog Mode to use for encoding changes in Flink internal data 
structure. */
     private final DebeziumChangelogMode changelogMode;
     private final MetricOption metricOption;
-    private SourceMetricData sourceMetricData;
+    private SourceExactlyMetric sourceExactlyMetric;
 
     /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
     public static Builder newBuilder() {
@@ -133,7 +133,7 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
     @Override
     public void open() {
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption);
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption);
         }
     }
 
@@ -146,16 +146,16 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData insert = extractAfterRow(value, valueSchema);
             validator.validate(insert, RowKind.INSERT);
             insert.setRowKind(RowKind.INSERT);
-            if (sourceMetricData != null) {
-                out = new MetricsCollector<>(out, sourceMetricData);
+            if (sourceExactlyMetric != null) {
+                out = new MetricsCollector<>(out, sourceExactlyMetric);
             }
             emit(record, insert, out);
         } else if (op == Envelope.Operation.DELETE) {
             GenericRowData delete = extractBeforeRow(value, valueSchema);
             validator.validate(delete, RowKind.DELETE);
             delete.setRowKind(RowKind.DELETE);
-            if (sourceMetricData != null) {
-                out = new MetricsCollector<>(out, sourceMetricData);
+            if (sourceExactlyMetric != null) {
+                out = new MetricsCollector<>(out, sourceExactlyMetric);
             }
             emit(record, delete, out);
         } else {
@@ -169,8 +169,8 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             GenericRowData after = extractAfterRow(value, valueSchema);
             validator.validate(after, RowKind.UPDATE_AFTER);
             after.setRowKind(RowKind.UPDATE_AFTER);
-            if (sourceMetricData != null) {
-                out = new MetricsCollector<>(out, sourceMetricData);
+            if (sourceExactlyMetric != null) {
+                out = new MetricsCollector<>(out, sourceExactlyMetric);
             }
             emit(record, after, out);
         }
@@ -686,4 +686,22 @@ public final class RowDataDebeziumDeserializeSchema 
implements DebeziumDeseriali
             }
         };
     }
+
+    public void flushAudit() {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.flushAudit();
+        }
+    }
+
+    public void updateCurrentCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+
+    public void updateLastCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+        }
+    }
 }

Reply via email to