This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a5238c02096 [FLINK-37172][table-runtime] Add logs in all existing 
async state ops to check if they are under async state when running
4a5238c02096 is described below

commit 4a5238c020961efaa8f84dc1f414aa69fec49e32
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Mon Jan 20 15:15:23 2025 +0800

    [FLINK-37172][table-runtime] Add logs in all existing async state ops to 
check if they are under async state when running
    
    This closes #26014
---
 .../operators/aggregate/async/AsyncStateGroupAggFunction.java    | 7 +++++++
 .../processors/AbstractAsyncStateSliceWindowAggProcessor.java    | 9 +++++++++
 .../asyncprocessing/AsyncStateDeduplicateFunctionBase.java       | 8 ++++++++
 .../stream/asyncprocessing/AsyncStateStreamingJoinOperator.java  | 8 ++++++++
 .../window/asyncprocessing/AsyncStateWindowJoinOperator.java     | 7 +++++++
 .../operators/rank/async/AbstractAsyncStateTopNFunction.java     | 7 +++++++
 6 files changed, 46 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java
index 9a58cba1c67a..50a7490da2aa 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java
@@ -31,11 +31,16 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.Collector;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /** Aggregate Function used for the groupby (without window) aggregate with 
async state api. */
 public class AsyncStateGroupAggFunction extends GroupAggFunctionBase {
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncStateGroupAggFunction.class);
+
     // stores the accumulators
     private transient ValueState<RowData> accState = null;
 
@@ -73,6 +78,8 @@ public class AsyncStateGroupAggFunction extends 
GroupAggFunctionBase {
     public void open(OpenContext openContext) throws Exception {
         super.open(openContext);
 
+        LOG.info("Group agg is using async state");
+
         InternalTypeInfo<RowData> accTypeInfo = 
InternalTypeInfo.ofFields(accTypes);
         ValueStateDescriptor<RowData> accDesc = new 
ValueStateDescriptor<>("accState", accTypeInfo);
         if (ttlConfig.isEnabled()) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
index e3bbc70e4d6a..cca05999ae36 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncwindow/processors/AbstractAsyncStateSliceWindowAggProcessor.java
@@ -32,6 +32,9 @@ import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner
 import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceSharedAssigner;
 import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
@@ -46,6 +49,9 @@ public abstract class 
AbstractAsyncStateSliceWindowAggProcessor
         extends AbstractAsyncStateWindowAggProcessor<Long>
         implements AsyncStateSlicingWindowProcessor<Long> {
 
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AbstractAsyncStateSliceWindowAggProcessor.class);
+
     protected final AsyncStateWindowBuffer.Factory windowBufferFactory;
     protected final SliceAssigner sliceAssigner;
     protected final long windowInterval;
@@ -80,6 +86,9 @@ public abstract class 
AbstractAsyncStateSliceWindowAggProcessor
     @Override
     public void open(AsyncStateContext<Long> context) throws Exception {
         super.open(context);
+
+        LOG.info("Slice window agg is using async state");
+
         this.windowBuffer =
                 windowBufferFactory.create(
                         ctx.getOperatorOwner(),
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
index a4c4e0edeadb..1b203cc4df72 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/asyncprocessing/AsyncStateDeduplicateFunctionBase.java
@@ -27,6 +27,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
 
 /**
@@ -42,6 +45,9 @@ abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, 
OUT>
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AsyncStateDeduplicateFunctionBase.class);
+
     // state stores previous message under the key.
     protected ValueState<T> state;
 
@@ -54,6 +60,8 @@ abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, 
OUT>
     public void open(OpenContext openContext) throws Exception {
         super.open(openContext);
 
+        LOG.info("Deduplicate is using async state");
+
         ValueStateDescriptor<T> stateDesc =
                 new ValueStateDescriptor<>("deduplicate-state", typeInfo);
         StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java
index e88cd09ae7c7..5c92d7a6ab21 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/asyncprocessing/AsyncStateStreamingJoinOperator.java
@@ -34,6 +34,9 @@ import 
org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideS
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.types.RowKind;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Streaming unbounded Join operator based on async state api, which supports 
INNER/LEFT/RIGHT/FULL
  * JOIN.
@@ -42,6 +45,9 @@ public class AsyncStateStreamingJoinOperator extends 
AbstractAsyncStateStreaming
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(AsyncStateStreamingJoinOperator.class);
+
     // whether left side is outer side, e.g. left is outer but right is not 
when LEFT OUTER JOIN
     private final boolean leftIsOuter;
     // whether right side is outer side, e.g. right is outer but left is not 
when RIGHT OUTER JOIN
@@ -86,6 +92,8 @@ public class AsyncStateStreamingJoinOperator extends 
AbstractAsyncStateStreaming
     public void open() throws Exception {
         super.open();
 
+        LOG.info("Join is using async state");
+
         this.outRow = new JoinedRowData();
         this.leftNullRow = new GenericRowData(leftType.toRowSize());
         this.rightNullRow = new GenericRowData(rightType.toRowSize());
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
index 5a5ee45b408d..50d2f1a00337 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/asyncprocessing/AsyncStateWindowJoinOperator.java
@@ -48,6 +48,9 @@ import 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowTimerSer
 import 
org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.time.ZoneId;
 
 /**
@@ -69,6 +72,8 @@ public class AsyncStateWindowJoinOperator extends 
AsyncStateTableStreamOperator<
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncStateWindowJoinOperator.class);
+
     private static final String LEFT_RECORDS_STATE_NAME = "left-records";
     private static final String RIGHT_RECORDS_STATE_NAME = "right-records";
 
@@ -119,6 +124,8 @@ public class AsyncStateWindowJoinOperator extends 
AsyncStateTableStreamOperator<
     public void open() throws Exception {
         super.open();
 
+        LOG.info("Window join is using async state");
+
         this.collector = new TimestampedCollector<>(output);
         collector.eraseTimestamp();
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
index 5c982ed161ab..12d2b9c7a563 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/async/AbstractAsyncStateTopNFunction.java
@@ -34,11 +34,16 @@ import 
org.apache.flink.table.runtime.operators.rank.RankRange;
 import org.apache.flink.table.runtime.operators.rank.RankType;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Objects;
 
 /** Base class for TopN Function with async state api. */
 public abstract class AbstractAsyncStateTopNFunction extends 
AbstractTopNFunction {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAsyncStateTopNFunction.class);
+
     private ValueState<Long> rankEndState;
 
     public AbstractAsyncStateTopNFunction(
@@ -65,6 +70,8 @@ public abstract class AbstractAsyncStateTopNFunction extends 
AbstractTopNFunctio
     public void open(OpenContext openContext) throws Exception {
         super.open(openContext);
 
+        LOG.info("Top-N is using async state");
+
         if (!isConstantRankEnd) {
             ValueStateDescriptor<Long> rankStateDesc =
                     new ValueStateDescriptor<>("rankEnd", Types.LONG);

Reply via email to