This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 4a5238c02096 [FLINK-37172][table-runtime] Add logs in all existing async state ops to check if they are under async state when running 4a5238c02096 is described below commit 4a5238c020961efaa8f84dc1f414aa69fec49e32 Author: Xuyang <xyzhong...@163.com> AuthorDate: Mon Jan 20 15:15:23 2025 +0800 [FLINK-37172][table-runtime] Add logs in all existing async state ops to check if they are under async state when running This closes #26014 --- .../operators/aggregate/async/AsyncStateGroupAggFunction.java | 7 +++++++ .../processors/AbstractAsyncStateSliceWindowAggProcessor.java | 9 +++++++++ .../asyncprocessing/AsyncStateDeduplicateFunctionBase.java | 8 ++++++++ .../stream/asyncprocessing/AsyncStateStreamingJoinOperator.java | 8 ++++++++ .../window/asyncprocessing/AsyncStateWindowJoinOperator.java | 7 +++++++ .../operators/rank/async/AbstractAsyncStateTopNFunction.java | 7 +++++++ 6 files changed, 46 insertions(+) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java index 9a58cba1c67a..50a7490da2aa 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java @@ -31,11 +31,16 @@ import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** Aggregate Function used for the groupby (without window) aggregate with async state api. */ public class AsyncStateGroupAggFunction extends GroupAggFunctionBase { private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(AsyncStateGroupAggFunction.class); + // stores the accumulators private transient ValueState<RowData> accState = null; @@ -73,6 +78,8 @@ public class AsyncStateGroupAggFunction extends GroupAggFunctionBase { public void open(OpenContext openContext) throws Exception { super.open(openContext); + LOG.info("Group agg is using async state"); + InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(accTypes); ValueStateDescriptor<RowData> accDesc = new ValueStateDescriptor<>("accState", accTypeInfo); if (ttlConfig.isEnabled()) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java index e3bbc70e4d6a..cca05999ae36 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java @@ -32,6 +32,9 @@ import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceSharedAssigner; import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -46,6 +49,9 @@ public abstract class AbstractAsyncStateSliceWindowAggProcessor extends AbstractAsyncStateWindowAggProcessor<Long> implements AsyncStateSlicingWindowProcessor<Long> { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAsyncStateSliceWindowAggProcessor.class); + protected final AsyncStateWindowBuffer.Factory windowBufferFactory; protected final SliceAssigner sliceAssigner; protected final long windowInterval; @@ -80,6 +86,9 @@ public abstract class AbstractAsyncStateSliceWindowAggProcessor @Override public void open(AsyncStateContext<Long> context) throws Exception { super.open(context); + + LOG.info("Slice window agg is using async state"); + this.windowBuffer = windowBufferFactory.create( ctx.getOperatorOwner(), diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java index a4c4e0edeadb..1b203cc4df72 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java @@ -27,6 +27,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; /** @@ -42,6 +45,9 @@ abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, OUT> private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(AsyncStateDeduplicateFunctionBase.class); + // state stores previous message under the key. protected ValueState<T> state; @@ -54,6 +60,8 @@ abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, OUT> public void open(OpenContext openContext) throws Exception { super.open(openContext); + LOG.info("Deduplicate is using async state"); + ValueStateDescriptor<T> stateDesc = new ValueStateDescriptor<>("deduplicate-state", typeInfo); StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java index e88cd09ae7c7..5c92d7a6ab21 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java @@ -34,6 +34,9 @@ import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideS import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.types.RowKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Streaming unbounded Join operator based on async state api, which supports INNER/LEFT/RIGHT/FULL * JOIN. @@ -42,6 +45,9 @@ public class AsyncStateStreamingJoinOperator extends AbstractAsyncStateStreaming private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(AsyncStateStreamingJoinOperator.class); + // whether left side is outer side, e.g. left is outer but right is not when LEFT OUTER JOIN private final boolean leftIsOuter; // whether right side is outer side, e.g. right is outer but left is not when RIGHT OUTER JOIN @@ -86,6 +92,8 @@ public class AsyncStateStreamingJoinOperator extends AbstractAsyncStateStreaming public void open() throws Exception { super.open(); + LOG.info("Join is using async state"); + this.outRow = new JoinedRowData(); this.leftNullRow = new GenericRowData(leftType.toRowSize()); this.rightNullRow = new GenericRowData(rightType.toRowSize()); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java index 5a5ee45b408d..50d2f1a00337 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java @@ -48,6 +48,9 @@ import org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerSer import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.ZoneId; /** @@ -69,6 +72,8 @@ public class AsyncStateWindowJoinOperator extends AsyncStateTableStreamOperator< private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(AsyncStateWindowJoinOperator.class); + private static final String LEFT_RECORDS_STATE_NAME = "left-records"; private static final String RIGHT_RECORDS_STATE_NAME = "right-records"; @@ -119,6 +124,8 @@ public class AsyncStateWindowJoinOperator extends AsyncStateTableStreamOperator< public void open() throws Exception { super.open(); + LOG.info("Window join is using async state"); + this.collector = new TimestampedCollector<>(output); collector.eraseTimestamp(); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java index 5c982ed161ab..12d2b9c7a563 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java @@ -34,11 +34,16 @@ import org.apache.flink.table.runtime.operators.rank.RankRange; import org.apache.flink.table.runtime.operators.rank.RankType; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Objects; /** Base class for TopN Function with async state api. */ public abstract class AbstractAsyncStateTopNFunction extends AbstractTopNFunction { + private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncStateTopNFunction.class); + private ValueState<Long> rankEndState; public AbstractAsyncStateTopNFunction( @@ -65,6 +70,8 @@ public abstract class AbstractAsyncStateTopNFunction extends AbstractTopNFunctio public void open(OpenContext openContext) throws Exception { super.open(openContext); + LOG.info("Top-N is using async state"); + if (!isConstantRankEnd) { ValueStateDescriptor<Long> rankStateDesc = new ValueStateDescriptor<>("rankEnd", Types.LONG);