This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 65f92147e2 [INLONG-9384][Sort] Fix pulsar audit data loss when
restarting (#9386)
65f92147e2 is described below
commit 65f92147e2dce57c28b9e6ea8c42c4ed903939b3
Author: Sting <[email protected]>
AuthorDate: Fri Dec 1 18:54:25 2023 +0800
[INLONG-9384][Sort] Fix pulsar audit data loss when restarting (#9386)
---
.../org/apache/inlong/sort/base/metric/SourceMetricData.java | 10 ++++++++++
.../apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java | 10 ++++++++++
2 files changed, 20 insertions(+)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 0a0cad5d6e..1e1a624762 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -298,6 +298,16 @@ public class SourceMetricData implements MetricData,
Serializable {
}
}
+ /**
+ * flush audit data
+ * usually call this method in close method or when checkpointing
+ */
+ public void flushAuditData() {
+ if (auditOperator != null) {
+ auditOperator.send();
+ }
+ }
+
public void outputMetrics(long rowCountSize, long rowDataSize, long
dataTime) {
outputDefaultMetrics(rowCountSize, rowDataSize);
if (auditOperator != null) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
index b193a5af1a..a963edf02a 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/internal/FlinkPulsarSource.java
@@ -886,6 +886,9 @@ public class FlinkPulsarSource<T>
if (!running) {
log.debug("snapshotState() called on closed source");
} else {
+
+ flushAudit();
+
unionOffsetStates.clear();
PulsarFetcher<T> fetcher = this.pulsarFetcher;
@@ -925,6 +928,13 @@ public class FlinkPulsarSource<T>
}
}
+ // flush audit data first to avoid audit data loss
+ private void flushAudit() {
+ if (sourceMetricData != null) {
+ sourceMetricData.flushAuditData();
+ }
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
if (!running) {