This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ade959ab83 [INLONG-10630][Sort] Make SQL Server source support report
audit information exactly once (#10631)
ade959ab83 is described below
commit ade959ab835abf6bd5b6a67bcf93e86b307c9460
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 16 12:40:00 2024 +0800
[INLONG-10630][Sort] Make SQL Server source support report audit
information exactly once (#10631)
---
.../sort/sqlserver/DebeziumSourceFunction.java | 10 +++++++
.../RowDataDebeziumDeserializeSchema.java | 32 +++++++++++++++++-----
2 files changed, 35 insertions(+), 7 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
index 7962b6f4cd..01118d6513 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/DebeziumSourceFunction.java
@@ -314,6 +314,10 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
+ if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+ ((RowDataDebeziumDeserializeSchema) deserializer)
+
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
+ }
}
private void snapshotOffsetState(long checkpointId) throws Exception {
@@ -488,6 +492,12 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
DebeziumOffset offset =
DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets);
changeConsumer.commitOffset(offset);
+
+ if (deserializer instanceof RowDataDebeziumDeserializeSchema) {
+ RowDataDebeziumDeserializeSchema schema =
(RowDataDebeziumDeserializeSchema) deserializer;
+ schema.flushAudit();
+ schema.updateLastCheckpointId(checkpointId);
+ }
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
index 2e632c1cb2..d90f470513 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java
@@ -19,7 +19,7 @@ package org.apache.inlong.sort.sqlserver;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import com.ververica.cdc.debezium.table.AppendMetadataCollector;
import com.ververica.cdc.debezium.table.DebeziumChangelogMode;
@@ -101,7 +101,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
/** Changelog Mode to use for encoding changes in Flink internal data
structure. */
private final DebeziumChangelogMode changelogMode;
private final MetricOption metricOption;
- private SourceMetricData sourceMetricData;
+ private SourceExactlyMetric sourceExactlyMetric;
/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
public static Builder newBuilder() {
@@ -133,7 +133,7 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
@Override
public void open() {
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption);
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
}
@@ -146,8 +146,8 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData insert = extractAfterRow(value, valueSchema);
validator.validate(insert, RowKind.INSERT);
insert.setRowKind(RowKind.INSERT);
- if (sourceMetricData != null) {
- out = new MetricsCollector<>(out, sourceMetricData);
+ if (sourceExactlyMetric != null) {
+ out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, insert, out);
} else if (op == Envelope.Operation.DELETE) {
@@ -166,8 +166,8 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
GenericRowData after = extractAfterRow(value, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
- if (sourceMetricData != null) {
- out = new MetricsCollector<>(out, sourceMetricData);
+ if (sourceExactlyMetric != null) {
+ out = new MetricsCollector<>(out, sourceExactlyMetric);
}
emit(record, after, out);
}
@@ -679,4 +679,22 @@ public final class RowDataDebeziumDeserializeSchema
implements DebeziumDeseriali
}
};
}
+
+ public void flushAudit() {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.flushAudit();
+ }
+ }
+
+ public void updateCurrentCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ public void updateLastCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateLastCheckpointId(checkpointId);
+ }
+ }
}