This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d99973c4f5 [INLONG-10609][Sort] Make PostgreSQL source support report
audit information exactly once (#10627)
d99973c4f5 is described below
commit d99973c4f5385ab9813893076165ec63cf61c338
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 16 12:40:43 2024 +0800
[INLONG-10609][Sort] Make PostgreSQL source support report audit
information exactly once (#10627)
---
.../sort/postgre/DebeziumSourceFunction.java | 9 ++++++
.../postgre/RowDataDebeziumDeserializeSchema.java | 36 ++++++++++++++++------
2 files changed, 36 insertions(+), 9 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
index 27b9988dd0..5efc6c6ea5 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java
@@ -313,6 +313,10 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
+ if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+ ((RowDataDebeziumDeserializeSchema) deserializer)
+
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+ }
}
private void snapshotOffsetState(long checkpointId) throws Exception {
@@ -487,6 +491,11 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
DebeziumOffset offset =
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
changeConsumer.commitOffset(offset);
+ if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+ RowDataDebeziumDeserializeSchema schema =
(RowDataDebeziumDeserializeSchema) deserializer;
+ schema.flushAudit();
+ schema.updateLastCheckpointId(checkpointId);
+ }
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
index de701a2ae1..fdf2d01327 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.postgre;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import com.ververica.cdc.debezium.table.AppendMetadataCollector;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
@@ -101,7 +101,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
/** Changelog Mode to use for encoding changes in Flink internal data
structure. */
private final DebeziumChangelogMode changelogMode;
private final MetricOption metricOption;
- private SourceMetricData sourceMetricData;
+ private SourceExactlyMetric sourceExactlyMetric;
/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
public static Builder newBuilder() {
@@ -133,7 +133,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
@Override
public void open() {
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption);
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
}
@@ -146,16 +146,16 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
- if (sourceMetricData != null) {
- out = new MetricsCollector<>(out, sourceMetricData);
+ if (sourceExactlyMetric != null) {
+ out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
GenericRowData delete = extractBeforeRow(value, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
- if (sourceMetricData != null) {
- out = new MetricsCollector<>(out, sourceMetricData);
+ if (sourceExactlyMetric != null) {
+ out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, delete, out);
} else {
@@ -169,8 +169,8 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
- if (sourceMetricData != null) {
- out = new MetricsCollector<>(out, sourceMetricData);
+ if (sourceExactlyMetric != null) {
+ out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, after, out);
}
@@ -686,4 +686,22 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
}
};
}
+
+ public void flushAudit() {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.flushAudit();
+ }
+ }
+
+ public void updateCurrentCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ public void updateLastCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+ }
+ }
}