This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 704b5c4b200 [FLINK-35405][runtime] Add metrics for 
AsyncExecutionController (#25998)
704b5c4b200 is described below

commit 704b5c4b200f392f02101ecde8df434fdd79321b
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Sun Jan 19 23:47:04 2025 +0800

    [FLINK-35405][runtime] Add metrics for AsyncExecutionController (#25998)
---
 docs/content.zh/docs/ops/metrics.md                | 56 ++++++++++++++++++----
 docs/content/docs/ops/metrics.md                   | 56 ++++++++++++++++++----
 .../asyncprocessing/AsyncExecutionController.java  | 10 +++-
 .../asyncprocessing/StateRequestBuffer.java        |  9 ++++
 .../AbstractAsyncStateStreamOperator.java          |  3 +-
 .../AbstractAsyncStateStreamOperatorV2.java        |  3 +-
 .../asyncprocessing/AbstractStateIteratorTest.java |  3 ++
 .../AsyncExecutionControllerTest.java              | 42 +++++++++++++++-
 .../state/v2/AbstractAggregatingStateTest.java     |  1 +
 .../state/v2/AbstractKeyedStateTestBase.java       |  1 +
 .../state/v2/AbstractReducingStateTest.java        |  1 +
 .../runtime/state/v2/StateBackendTestV2Base.java   |  5 ++
 .../InternalTimerServiceAsyncImplTest.java         |  1 +
 .../flink/state/forst/ForStStateTestBase.java      |  1 +
 14 files changed, 172 insertions(+), 20 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index bc7b7c19c6d..852893b2c52 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1545,31 +1545,33 @@ Certain RocksDB native metrics are available but 
disabled by default, you can fi
 <table class="table table-bordered">
   <thead>
     <tr>
-      <th class="text-left" style="width: 18%">Scope</th>
-      <th class="text-left" style="width: 26%">Metrics</th>
-      <th class="text-left" style="width: 48%">Description</th>
-      <th class="text-left" style="width: 8%">Type</th>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 15%">Infix</th>
+      <th class="text-left" style="width: 15%">Metrics</th>
+      <th class="text-left" style="width: 50%">Description</th>
+      <th class="text-left" style="width: 5%">Type</th>
     </tr>
   </thead>
   <tbody>
     <tr>
       <th rowspan="4"><strong>Task/Operator</strong></th>
-      <td>forst.fileCache.hit</td>
+      <td rowspan="4">forst.fileCache</td>
+      <td>hit</td>
       <td>The hit count of ForSt state backend cache.</td>
       <td>Counter</td>
     </tr>
     <tr>
-      <td>forst.fileCache.miss</td>
+      <td>miss</td>
       <td>The miss count of ForSt state backend cache.</td>
       <td>Counter</td>
     </tr>
     <tr>
-      <td>forst.fileCache.usedBytes</td>
+      <td>usedBytes</td>
       <td>The bytes cached in ForSt state backend cache.</td>
       <td>Gauge</td>
     </tr>
     <tr>
-      <td>forst.fileCache.remainingBytes</td>
+      <td>remainingBytes</td>
       <td>The remaining space in the volume for the configured cache. Only 
available when 'state.backend.forst.cache.reserve-size' is set above 0. </td>
       <td>Gauge</td>
     </tr>
@@ -2280,6 +2282,44 @@ logged by `SystemResourcesMetricsInitializer` during the 
startup.
   </tbody>
 </table>
 
+### Async State Processing
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 10%">Infix</th>
+      <th class="text-left" style="width: 20%">Metrics</th>
+      <th class="text-left" style="width: 50%">Description</th>
+      <th class="text-left" style="width: 5%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="4"><strong>Operator</strong></th>
+      <td rowspan="4">asyncStateProcessing</td>
+      <td>numInFlightRecords</td>
+      <td>The number of in-flight records in the async execution controller's 
buffers.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>activeBufferSize</td>
+      <td>The number of records which are pending to be processed.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>blockingBufferSize</td>
+      <td>The number of records which are blocked by the ongoing records.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>numBlockingKeys</td>
+      <td>The number of different keys are blocked in async execution 
controller.</td>
+      <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
 ## End-to-End latency tracking
 
 Flink allows to track the latency of records travelling through the system. 
This feature is disabled by default.
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index c937b96bce8..f091e2d9d31 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1535,31 +1535,33 @@ Certain RocksDB native metrics are available but 
disabled by default, you can fi
 <table class="table table-bordered">
   <thead>
     <tr>
-      <th class="text-left" style="width: 18%">Scope</th>
-      <th class="text-left" style="width: 26%">Metrics</th>
-      <th class="text-left" style="width: 48%">Description</th>
-      <th class="text-left" style="width: 8%">Type</th>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 15%">Infix</th>
+      <th class="text-left" style="width: 15%">Metrics</th>
+      <th class="text-left" style="width: 50%">Description</th>
+      <th class="text-left" style="width: 5%">Type</th>
     </tr>
   </thead>
   <tbody>
     <tr>
       <th rowspan="4"><strong>Task/Operator</strong></th>
-      <td>forst.fileCache.hit</td>
+      <td rowspan="4">forst.fileCache</td>
+      <td>hit</td>
       <td>The hit count of ForSt state backend cache.</td>
       <td>Counter</td>
     </tr>
     <tr>
-      <td>forst.fileCache.miss</td>
+      <td>miss</td>
       <td>The miss count of ForSt state backend cache.</td>
       <td>Counter</td>
     </tr>
     <tr>
-      <td>forst.fileCache.usedBytes</td>
+      <td>usedBytes</td>
       <td>The bytes cached in ForSt state backend cache.</td>
       <td>Gauge</td>
     </tr>
     <tr>
-      <td>forst.fileCache.remainingBytes</td>
+      <td>remainingBytes</td>
       <td>The remaining space in the volume for the configured cache. Only 
available when 'state.backend.forst.cache.reserve-size' is set above 0. </td>
       <td>Gauge</td>
     </tr>
@@ -2230,6 +2232,44 @@ Metrics below can be used to measure the effectiveness 
of speculative execution.
   </tbody>
 </table>
 
+### Async State Processing
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 10%">Infix</th>
+      <th class="text-left" style="width: 20%">Metrics</th>
+      <th class="text-left" style="width: 50%">Description</th>
+      <th class="text-left" style="width: 5%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <th rowspan="4"><strong>Operator</strong></th>
+      <td rowspan="4">asyncStateProcessing</td>
+      <td>numInFlightRecords</td>
+      <td>The number of in-flight records in the async execution controller's 
buffers.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>activeBufferSize</td>
+      <td>The number of records which are pending to be processed.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>blockingBufferSize</td>
+      <td>The number of records which are blocked by the ongoing records.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>numBlockingKeys</td>
+      <td>The number of different keys are blocked in async execution 
controller.</td>
+      <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
 ## End-to-End latency tracking
 
 Flink allows to track the latency of records travelling through the system. 
This feature is disabled by default.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
index cc7eed91d83..4fdefc64d47 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.v2.State;
 import org.apache.flink.api.common.state.v2.StateFuture;
 import org.apache.flink.core.state.InternalStateFuture;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
 import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -158,7 +159,8 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
             int batchSize,
             long bufferTimeout,
             int maxInFlightRecords,
-            SwitchContextListener<K> switchContextListener) {
+            @Nullable SwitchContextListener<K> switchContextListener,
+            @Nullable MetricGroup metricGroup) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.exceptionHandler = exceptionHandler;
@@ -187,6 +189,12 @@ public class AsyncExecutionController<K> implements 
StateRequestHandler, Closeab
 
         this.epochManager = new EpochManager(this);
         this.switchContextListener = switchContextListener;
+        if (metricGroup != null) {
+            metricGroup.gauge("numInFlightRecords", 
this::getInFlightRecordNum);
+            metricGroup.gauge("activeBufferSize", () -> 
stateRequestsBuffer.activeQueueSize());
+            metricGroup.gauge("blockingBufferSize", () -> 
stateRequestsBuffer.blockingQueueSize());
+            metricGroup.gauge("numBlockingKeys", () -> 
stateRequestsBuffer.blockingKeyNum());
+        }
         LOG.info(
                 "Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordNum {}, epochParallelMode {}",
                 this.batchSize,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
index fe469d90012..b5946be54d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java
@@ -218,6 +218,15 @@ public class StateRequestBuffer<K> implements Closeable {
         return blockingQueueSize;
     }
 
+    /**
+     * Get the number of different keys in blocking queue.
+     *
+     * @return the number of different keys in blocking queue.
+     */
+    int blockingKeyNum() {
+        return blockingQueue.size();
+    }
+
     /**
      * Get the number of state requests of active queue in constant-time.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index 3ce835c8f28..32f88ff7759 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -121,7 +121,8 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> 
extends AbstractStre
                                 asyncBufferSize,
                                 asyncBufferTimeout,
                                 inFlightRecordsLimit,
-                                asyncKeyedStateBackend);
+                                asyncKeyedStateBackend,
+                                
getMetricGroup().addGroup("asyncStateProcessing"));
                 asyncKeyedStateBackend.setup(asyncExecutionController);
                 if (asyncKeyedStateBackend instanceof 
AsyncKeyedStateBackendAdaptor) {
                     LOG.warn(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
index b1a80cebaa0..955faa770b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
@@ -118,7 +118,8 @@ public abstract class 
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
                                 asyncBufferSize,
                                 asyncBufferTimeout,
                                 inFlightRecordsLimit,
-                                asyncKeyedStateBackend);
+                                asyncKeyedStateBackend,
+                                
getMetricGroup().addGroup("asyncStateProcessing"));
                 asyncKeyedStateBackend.setup(asyncExecutionController);
                 if (asyncKeyedStateBackend instanceof 
AsyncKeyedStateBackendAdaptor) {
                     LOG.warn(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
index 68ea31ca0ea..9ec5c3769e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java
@@ -56,6 +56,7 @@ public class AbstractStateIteratorTest {
                         100,
                         1000,
                         1,
+                        null,
                         null);
         stateExecutor.bindAec(aec);
         RecordContext<String> recordContext = aec.buildContext("1", "key1");
@@ -95,6 +96,7 @@ public class AbstractStateIteratorTest {
                         100,
                         1000,
                         1,
+                        null,
                         null);
         stateExecutor.bindAec(aec);
         RecordContext<String> recordContext = aec.buildContext("1", "key1");
@@ -141,6 +143,7 @@ public class AbstractStateIteratorTest {
                         100,
                         1000,
                         1,
+                        null,
                         null);
         stateExecutor.bindAec(aec);
         RecordContext<String> recordContext = aec.buildContext("1", "key1");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
index cf4ed16b1fe..e77e3c522de 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java
@@ -28,6 +28,9 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.state.InternalStateFuture;
 import 
org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
 import org.apache.flink.core.state.StateFutureUtils;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
 import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode;
 import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
@@ -46,6 +49,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
@@ -73,6 +77,7 @@ class AsyncExecutionControllerTest {
                                 })
                         .thenAccept(val -> output.set(val));
             };
+    final Map<String, Gauge> registeredGauges = new HashMap<>();
 
     void setup(
             int batchSize,
@@ -98,6 +103,23 @@ class AsyncExecutionControllerTest {
         }
         closeableRegistry.registerCloseable(asyncKeyedStateBackend);
         closeableRegistry.registerCloseable(asyncKeyedStateBackend::dispose);
+
+        UnregisteredMetricsGroup metricsGroup =
+                new UnregisteredMetricsGroup() {
+                    String prefix = "";
+
+                    @Override
+                    public <T, G extends Gauge<T>> G gauge(String name, G 
gauge) {
+                        registeredGauges.put(prefix + "." + name, gauge);
+                        return gauge;
+                    }
+
+                    @Override
+                    public MetricGroup addGroup(String name) {
+                        prefix = name;
+                        return this;
+                    }
+                };
         aec =
                 new AsyncExecutionController<>(
                         mailboxExecutor,
@@ -108,7 +130,8 @@ class AsyncExecutionControllerTest {
                         batchSize,
                         timeout,
                         maxInFlight,
-                        null);
+                        null,
+                        metricsGroup.addGroup("asyncStateProcessing"));
         asyncKeyedStateBackend.setup(aec);
 
         try {
@@ -149,6 +172,15 @@ class AsyncExecutionControllerTest {
         assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
         assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
         assertThat(aec.inFlightRecordNum.get()).isEqualTo(1);
+        
assertThat(registeredGauges.get("asyncStateProcessing.numInFlightRecords").getValue())
+                .isEqualTo(1);
+        
assertThat(registeredGauges.get("asyncStateProcessing.activeBufferSize").getValue())
+                .isEqualTo(1);
+        
assertThat(registeredGauges.get("asyncStateProcessing.blockingBufferSize").getValue())
+                .isEqualTo(0);
+        
assertThat(registeredGauges.get("asyncStateProcessing.numBlockingKeys").getValue())
+                .isEqualTo(0);
+
         aec.triggerIfNeeded(true);
         // After running, the value update is in active buffer.
         assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1);
@@ -189,6 +221,14 @@ class AsyncExecutionControllerTest {
         assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1);
         assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1);
         assertThat(aec.inFlightRecordNum.get()).isEqualTo(2);
+        
assertThat(registeredGauges.get("asyncStateProcessing.numInFlightRecords").getValue())
+                .isEqualTo(2);
+        
assertThat(registeredGauges.get("asyncStateProcessing.activeBufferSize").getValue())
+                .isEqualTo(1);
+        
assertThat(registeredGauges.get("asyncStateProcessing.blockingBufferSize").getValue())
+                .isEqualTo(1);
+        
assertThat(registeredGauges.get("asyncStateProcessing.numBlockingKeys").getValue())
+                .isEqualTo(1);
         aec.triggerIfNeeded(true);
         // Value update for record2 finishes. The value get for record3 is 
migrated from blocking
         // buffer to active buffer actively.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
index 705a2c50d16..0522b88c05d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java
@@ -116,6 +116,7 @@ class AbstractAggregatingStateTest extends 
AbstractKeyedStateTestBase {
                         100,
                         10000,
                         1,
+                        null,
                         null);
         AbstractAggregatingState<String, String, Integer, Integer, Integer> 
aggregatingState =
                 new AbstractAggregatingState<>(aec, descriptor);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
index 00ca94d6041..5353c8b73c0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java
@@ -86,6 +86,7 @@ public class AbstractKeyedStateTestBase {
                         1,
                         1000,
                         1,
+                        null,
                         null);
         exception = new AtomicReference<>(null);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
index b6a3eb7144b..cac5f8631b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java
@@ -91,6 +91,7 @@ public class AbstractReducingStateTest extends 
AbstractKeyedStateTestBase {
                         100,
                         10000,
                         1,
+                        null,
                         null);
         AbstractReducingState<String, String, Integer> reducingState =
                 new AbstractReducingState<>(aec, descriptor);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
index a1a3165ba82..8f7ec470846 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java
@@ -234,6 +234,7 @@ public abstract class StateBackendTestV2Base<B extends 
AbstractStateBackend> {
                             aecBatchSize,
                             aecBufferTimeout,
                             aecMaxInFlightRecords,
+                            null,
                             null);
             backend.setup(aec);
 
@@ -324,6 +325,7 @@ public abstract class StateBackendTestV2Base<B extends 
AbstractStateBackend> {
                             aecBatchSize,
                             aecBufferTimeout,
                             aecMaxInFlightRecords,
+                            null,
                             null);
             backend.setup(aec);
 
@@ -410,6 +412,7 @@ public abstract class StateBackendTestV2Base<B extends 
AbstractStateBackend> {
                             aecBatchSize,
                             aecBufferTimeout,
                             aecMaxInFlightRecords,
+                            null,
                             null);
             backend.setup(aec);
 
@@ -480,6 +483,7 @@ public abstract class StateBackendTestV2Base<B extends 
AbstractStateBackend> {
                             aecBatchSize,
                             aecBufferTimeout,
                             aecMaxInFlightRecords,
+                            null,
                             null);
             backend.setup(aec);
 
@@ -585,6 +589,7 @@ public abstract class StateBackendTestV2Base<B extends 
AbstractStateBackend> {
                         1,
                         -1,
                         1,
+                        null,
                         null);
         backend.setup(aec);
         try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
index d53197b5547..c05ad4b41da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java
@@ -79,6 +79,7 @@ class InternalTimerServiceAsyncImplTest {
                         2,
                         1000L,
                         10,
+                        null,
                         null);
         // ensure arbitrary key is in the key group
         int totalKeyGroups = 128;
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java
index fe6bb88eb85..ed93bd81532 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java
@@ -82,6 +82,7 @@ public class ForStStateTestBase {
                         100,
                         0,
                         1,
+                        null,
                         null);
         keyedBackend.setup(aec);
     }

Reply via email to