This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 45870d22a6 [INLONG-10355][Sort] Make Iceberg source support report
audit information exactly once (#10551)
45870d22a6 is described below
commit 45870d22a6ae97b76af5612b8fff8af619008416
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jul 2 11:46:27 2024 +0800
[INLONG-10355][Sort] Make Iceberg source support report audit information
exactly once (#10551)
---
.../iceberg/source/reader/IcebergSourceReader.java | 9 +++++++-
.../reader/InlongIcebergSourceReaderMetrics.java | 26 ++++++++++++++++------
2 files changed, 27 insertions(+), 8 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
index df75723ceb..a8ad9b5259 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -67,10 +67,17 @@ public class IcebergSourceReader<T>
}
@Override
public List<IcebergSourceSplit> snapshotState(long checkpointId) {
- metrics.flushAudit();
+ metrics.updateCurrentCheckpointId(checkpointId);
return super.snapshotState(checkpointId);
}
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ metrics.flushAudit();
+ metrics.updateLastCheckpointId(checkpointId);
+ }
+
@Override
protected IcebergSourceSplit initializedState(IcebergSourceSplit split) {
return split;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
index 2210fbca02..84c2809320 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.iceberg.source.reader;
import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.sort.iceberg.utils.RecyclableJoinedRowData;
import lombok.extern.slf4j.Slf4j;
@@ -35,7 +35,7 @@ import java.nio.charset.StandardCharsets;
public class InlongIcebergSourceReaderMetrics<T> extends
IcebergSourceReaderMetrics {
private final MetricGroup metrics;
- private SourceMetricData sourceMetricData;
+ private SourceExactlyMetric sourceExactlyMetric;
public InlongIcebergSourceReaderMetrics(MetricGroup metrics, String
fullTableName) {
super(metrics, fullTableName);
@@ -44,20 +44,20 @@ public class InlongIcebergSourceReaderMetrics<T> extends
IcebergSourceReaderMetr
public void registerMetrics(MetricOption metricOption) {
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption, metrics);
+ sourceExactlyMetric = new SourceExactlyMetric(metricOption,
metrics);
} else {
log.warn("failed to init sourceMetricData since the metricOption
is null");
}
}
public void outputMetricsWithEstimate(ArrayBatchRecords<T> batchRecord) {
- if (sourceMetricData != null) {
+ if (sourceExactlyMetric != null) {
int dataCount = batchRecord.numberOfRecords();
T[] records = batchRecord.records();
for (int i = 0; i < dataCount; i++) {
long dataSize = getDataSize(records[i]);
long dataTime = getDataTime(records[i]);
- sourceMetricData.outputMetrics(1, dataSize, dataTime);
+ sourceExactlyMetric.outputMetrics(1, dataSize, dataTime);
}
}
@@ -79,8 +79,20 @@ public class InlongIcebergSourceReaderMetrics<T> extends
IcebergSourceReaderMetr
}
void flushAudit() {
- if (sourceMetricData != null) {
- sourceMetricData.flushAuditData();
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.flushAudit();
+ }
+ }
+
+ void updateCurrentCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ void updateLastCheckpointId(long checkpointId) {
+ if (sourceExactlyMetric != null) {
+ sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}
}