This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 9afd220eb07f5f4f9e1c1df38cac85ed57ad3ee4 Author: Charles <[email protected]> AuthorDate: Tue Sep 20 17:47:36 2022 +0800 [INLONG-5955][Sort] Support metric state recovery for HBase (#5960) --- .../inlong/sort/hbase/sink/HBaseSinkFunction.java | 30 ++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java index a1e9641d2..10df3d14f 100644 --- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java +++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java @@ -20,6 +20,10 @@ package org.apache.inlong.sort.hbase.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.hbase.sink.HBaseMutationConverter; import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; @@ -39,7 +43,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +57,9 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * The sink function for HBase. @@ -86,6 +95,9 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> * </p> */ private final AtomicReference<Throwable> failureThrowable = new AtomicReference<>(); + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; + private SinkMetricData sinkMetricData; private transient Connection connection; private transient BufferedMutator mutator; private transient ScheduledExecutorService executor; @@ -93,7 +105,6 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> private transient AtomicLong numPendingRequests; private transient RuntimeContext runtimeContext; private transient volatile boolean closed = false; - private SinkMetricData sinkMetricData; private Long dataSize = 0L; private Long rowSize = 0L; @@ -126,6 +137,8 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(inlongAudit) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -290,11 +303,24 @@ public class HBaseSinkFunction<T> extends RichSinkFunction<T> while (numPendingRequests.get() != 0) { flush(); } + if (sinkMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { - // nothing to do. + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } } @Override
