This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5d56e4107a [INLONG-10312][Sort] Iceberg sink support report audit
information exactly once (#10453)
5d56e4107a is described below
commit 5d56e4107aaf00c7cb6da3e04f1d9b512013109b
Author: XiaoYou201 <[email protected]>
AuthorDate: Tue Jun 25 09:53:32 2024 +0800
[INLONG-10312][Sort] Iceberg sink support report audit information exactly
once (#10453)
---
.../inlong/sort/base/metric/SinkExactlyMetric.java | 2 +-
.../sort/iceberg/sink/IcebergStreamWriter.java | 6 +++++
.../iceberg/sink/IcebergStreamWriterMetrics.java | 28 +++++++++++++++-------
3 files changed, 27 insertions(+), 9 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
index 5c916a1b56..cb3d20f389 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkExactlyMetric.java
@@ -230,7 +230,7 @@ public class SinkExactlyMetric implements MetricData,
Serializable {
invokeDirty(1, getDataSize(o));
}
- public void invokeWithId(long rowCount, long rowSize, long dataTime) {
+ public void invoke(long rowCount, long rowSize, long dataTime) {
outputDefaultMetrics(rowCount, rowSize);
outputAuditMetricsWithId(rowCount, rowSize, dataTime);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index b318306380..eb62400ad8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -81,7 +81,13 @@ class IcebergStreamWriter<T> extends
AbstractStreamOperator<WriteResult>
@Override
public void snapshotState(StateSnapshotContext context) {
+ writerMetrics.updateCurrentCheckpointId(context.getCheckpointId());
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
writerMetrics.flushAudit();
+ writerMetrics.updateLastCheckpointId(checkpointId);
}
@Override
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
index 72ca7e0cf5..3a219cd6d5 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
@@ -18,7 +18,7 @@
package org.apache.inlong.sort.iceberg.sink;
import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.metric.SinkExactlyMetric;
import com.codahale.metrics.SlidingWindowReservoir;
import lombok.extern.slf4j.Slf4j;
@@ -41,8 +41,8 @@ class IcebergStreamWriterMetrics {
// It should also produce good accuracy for histogram distribution (like
percentiles).
private static final int HISTOGRAM_RESERVOIR_SIZE = 1024;
- private MetricGroup metrics;
- private SourceMetricData sourceMetricData;
+ private final MetricGroup metrics;
+ private SinkExactlyMetric sinkExactlyMetric;
private final Counter flushedDataFiles;
private final Counter flushedDeleteFiles;
@@ -77,7 +77,7 @@ class IcebergStreamWriterMetrics {
public void registerMetrics(MetricOption metricOption) {
if (metricOption != null) {
- sourceMetricData = new SourceMetricData(metricOption, metrics);
+ sinkExactlyMetric = new SinkExactlyMetric(metricOption, metrics);
} else {
log.warn("failed to init sourceMetricData since the metricOption
is null");
}
@@ -109,14 +109,26 @@ class IcebergStreamWriterMetrics {
}
void outputMetricsWithEstimate(int size, long time) {
- if (sourceMetricData != null) {
- sourceMetricData.outputMetrics(1, size, time);
+ if (sinkExactlyMetric != null) {
+ sinkExactlyMetric.invoke(1, size, time);
}
}
void flushAudit() {
- if (sourceMetricData != null) {
- sourceMetricData.flushAuditData();
+ if (sinkExactlyMetric != null) {
+ sinkExactlyMetric.flushAudit();
+ }
+ }
+
+ void updateCurrentCheckpointId(long checkpointId) {
+ if (sinkExactlyMetric != null) {
+ sinkExactlyMetric.updateCurrentCheckpointId(checkpointId);
+ }
+ }
+
+ void updateLastCheckpointId(long checkpointId) {
+ if (sinkExactlyMetric != null) {
+ sinkExactlyMetric.updateLastCheckpointId(checkpointId);
}
}
}