This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 45870d22a6 [INLONG-10355][Sort] Make Iceberg source support report 
audit information exactly once (#10551)
45870d22a6 is described below

commit 45870d22a6ae97b76af5612b8fff8af619008416
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 2 11:46:27 2024 +0800

    [INLONG-10355][Sort] Make Iceberg source support report audit information 
exactly once (#10551)
---
 .../iceberg/source/reader/IcebergSourceReader.java |  9 +++++++-
 .../reader/InlongIcebergSourceReaderMetrics.java   | 26 ++++++++++++++++------
 2 files changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
index df75723ceb..a8ad9b5259 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -67,10 +67,17 @@ public class IcebergSourceReader<T>
     }
     @Override
     public List<IcebergSourceSplit> snapshotState(long checkpointId) {
-        metrics.flushAudit();
+        metrics.updateCurrentCheckpointId(checkpointId);
         return super.snapshotState(checkpointId);
     }
 
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+        metrics.flushAudit();
+        metrics.updateLastCheckpointId(checkpointId);
+    }
+
     @Override
     protected IcebergSourceSplit initializedState(IcebergSourceSplit split) {
         return split;
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
index 2210fbca02..84c2809320 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.iceberg.source.reader;
 
 import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
 import org.apache.inlong.sort.iceberg.utils.RecyclableJoinedRowData;
 
 import lombok.extern.slf4j.Slf4j;
@@ -35,7 +35,7 @@ import java.nio.charset.StandardCharsets;
 public class InlongIcebergSourceReaderMetrics<T> extends 
IcebergSourceReaderMetrics {
 
     private final MetricGroup metrics;
-    private SourceMetricData sourceMetricData;
+    private SourceExactlyMetric sourceExactlyMetric;
 
     public InlongIcebergSourceReaderMetrics(MetricGroup metrics, String 
fullTableName) {
         super(metrics, fullTableName);
@@ -44,20 +44,20 @@ public class InlongIcebergSourceReaderMetrics<T> extends 
IcebergSourceReaderMetr
 
     public void registerMetrics(MetricOption metricOption) {
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption, metrics);
+            sourceExactlyMetric = new SourceExactlyMetric(metricOption, 
metrics);
         } else {
             log.warn("failed to init sourceMetricData since the metricOption 
is null");
         }
     }
 
     public void outputMetricsWithEstimate(ArrayBatchRecords<T> batchRecord) {
-        if (sourceMetricData != null) {
+        if (sourceExactlyMetric != null) {
             int dataCount = batchRecord.numberOfRecords();
             T[] records = batchRecord.records();
             for (int i = 0; i < dataCount; i++) {
                 long dataSize = getDataSize(records[i]);
                 long dataTime = getDataTime(records[i]);
-                sourceMetricData.outputMetrics(1, dataSize, dataTime);
+                sourceExactlyMetric.outputMetrics(1, dataSize, dataTime);
             }
 
         }
@@ -79,8 +79,20 @@ public class InlongIcebergSourceReaderMetrics<T> extends 
IcebergSourceReaderMetr
     }
 
     void flushAudit() {
-        if (sourceMetricData != null) {
-            sourceMetricData.flushAuditData();
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.flushAudit();
+        }
+    }
+
+    void updateCurrentCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+        }
+    }
+
+    void updateLastCheckpointId(long checkpointId) {
+        if (sourceExactlyMetric != null) {
+            sourceExactlyMetric.updateLastCheckpointId(checkpointId);
         }
     }
 }

Reply via email to