This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit d32537802972d454a81808994947f632b611441d Author: Charles <[email protected]> AuthorDate: Tue Sep 20 17:42:03 2022 +0800 [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961) --- .../filesystem/stream/AbstractStreamingWriter.java | 37 +++++++++++++++++++--- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java index 95267c698..9edcc82b2 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java @@ -18,6 +18,10 @@ package org.apache.inlong.sort.filesystem.stream; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -34,7 +38,13 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; + +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send @@ -53,9 +63,12 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe IN, String, ? extends StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder; - private String inlongMetric; + private final String inlongMetric; + private final String inlongAudit; - private String inlongAudit; + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; + private SinkMetricData sinkMetricData; // --------------------------- runtime fields ----------------------------- @@ -103,11 +116,13 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe super.open(); MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) - .withRegisterMetric(RegisteredMetric.ALL) .withInlongAudit(inlongAudit) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { - metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } @@ -149,12 +164,26 @@ public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOpe bucketCheckInterval); currentWatermark = Long.MIN_VALUE; + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); helper.snapshotState(context.getCheckpointId()); + if (sinkMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } } @Override
