This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 97e2bc184c [INLONG-9404][Sort] Fix StarRocks Audit lost when stop job
immediately after checkpoint (#9418)
97e2bc184c is described below
commit 97e2bc184c986e04a5769d79eda441ed1f18ae80
Author: Sting <[email protected]>
AuthorDate: Tue Dec 5 16:47:40 2023 +0800
[INLONG-9404][Sort] Fix StarRocks Audit lost when stop job immediately
after checkpoint (#9418)
---
.../org/apache/inlong/sort/base/metric/SinkMetricData.java | 10 ++++++++++
.../table/sink/table/StarRocksDynamicSinkFunctionV2.java | 9 +++++++++
2 files changed, 19 insertions(+)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 4b48c4ece2..1b09343b5c 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -279,6 +279,16 @@ public class SinkMetricData implements MetricData,
Serializable {
outputAuditMetrics(rowCount, rowSize, dataTime);
}
+ /**
+ * flush audit data
+ * usually call this method in close method or when checkpointing
+ */
+ public void flushAuditData() {
+ if (auditOperator != null) {
+ auditOperator.send();
+ }
+ }
+
private void outputAuditMetrics(long rowCount, long rowSize, long
dataTime) {
if (auditOperator != null) {
for (Integer key : auditKeys) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
index 70326a67a3..01c50b9015 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
@@ -255,6 +255,9 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
sinkManager.flush();
+
+ flushAudit();
+
if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
return;
}
@@ -276,6 +279,12 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
}
}
+ private void flushAudit() {
+ if (sinkMetricData != null) {
+ sinkMetricData.flushAuditData();
+ }
+ }
+
@Override
public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {
if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {