This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 704b5c4b200 [FLINK-35405][runtime] Add metrics for AsyncExecutionController (#25998) 704b5c4b200 is described below commit 704b5c4b200f392f02101ecde8df434fdd79321b Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Sun Jan 19 23:47:04 2025 +0800 [FLINK-35405][runtime] Add metrics for AsyncExecutionController (#25998) --- docs/content.zh/docs/ops/metrics.md | 56 ++++++++++++++++++---- docs/content/docs/ops/metrics.md | 56 ++++++++++++++++++---- .../asyncprocessing/AsyncExecutionController.java | 10 +++- .../asyncprocessing/StateRequestBuffer.java | 9 ++++ .../AbstractAsyncStateStreamOperator.java | 3 +- .../AbstractAsyncStateStreamOperatorV2.java | 3 +- .../asyncprocessing/AbstractStateIteratorTest.java | 3 ++ .../AsyncExecutionControllerTest.java | 42 +++++++++++++++- .../state/v2/AbstractAggregatingStateTest.java | 1 + .../state/v2/AbstractKeyedStateTestBase.java | 1 + .../state/v2/AbstractReducingStateTest.java | 1 + .../runtime/state/v2/StateBackendTestV2Base.java | 5 ++ .../InternalTimerServiceAsyncImplTest.java | 1 + .../flink/state/forst/ForStStateTestBase.java | 1 + 14 files changed, 172 insertions(+), 20 deletions(-) diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md index bc7b7c19c6d..852893b2c52 100644 --- a/docs/content.zh/docs/ops/metrics.md +++ b/docs/content.zh/docs/ops/metrics.md @@ -1545,31 +1545,33 @@ Certain RocksDB native metrics are available but disabled by default, you can fi <table class="table table-bordered"> <thead> <tr> - <th class="text-left" style="width: 18%">Scope</th> - <th class="text-left" style="width: 26%">Metrics</th> - <th class="text-left" style="width: 48%">Description</th> - <th class="text-left" style="width: 8%">Type</th> + <th class="text-left" style="width: 15%">Scope</th> + <th class="text-left" style="width: 15%">Infix</th> + <th class="text-left" style="width: 15%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + <th class="text-left" style="width: 5%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="4"><strong>Task/Operator</strong></th> - <td>forst.fileCache.hit</td> + <td rowspan="4">forst.fileCache</td> + <td>hit</td> <td>The hit count of ForSt state backend cache.</td> <td>Counter</td> </tr> <tr> - <td>forst.fileCache.miss</td> + <td>miss</td> <td>The miss count of ForSt state backend cache.</td> <td>Counter</td> </tr> <tr> - <td>forst.fileCache.usedBytes</td> + <td>usedBytes</td> <td>The bytes cached in ForSt state backend cache.</td> <td>Gauge</td> </tr> <tr> - <td>forst.fileCache.remainingBytes</td> + <td>remainingBytes</td> <td>The remaining space in the volume for the configured cache. Only available when 'state.backend.forst.cache.reserve-size' is set above 0. </td> <td>Gauge</td> </tr> @@ -2280,6 +2282,44 @@ logged by `SystemResourcesMetricsInitializer` during the startup. </tbody> </table> +### Async State Processing + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 15%">Scope</th> + <th class="text-left" style="width: 10%">Infix</th> + <th class="text-left" style="width: 20%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + <th class="text-left" style="width: 5%">Type</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="4"><strong>Operator</strong></th> + <td rowspan="4">asyncStateProcessing</td> + <td>numInFlightRecords</td> + <td>The number of in-flight records in the async execution controller's buffers.</td> + <td>Gauge</td> + </tr> + <tr> + <td>activeBufferSize</td> + <td>The number of records which are pending to be processed.</td> + <td>Gauge</td> + </tr> + <tr> + <td>blockingBufferSize</td> + <td>The number of records which are blocked by the ongoing records.</td> + <td>Gauge</td> + </tr> + <tr> + <td>numBlockingKeys</td> + <td>The number of different keys are blocked in async execution controller.</td> + <td>Gauge</td> + </tr> + </tbody> +</table> + ## End-to-End latency tracking Flink allows to track the latency of records travelling through the system. This feature is disabled by default. diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md index c937b96bce8..f091e2d9d31 100644 --- a/docs/content/docs/ops/metrics.md +++ b/docs/content/docs/ops/metrics.md @@ -1535,31 +1535,33 @@ Certain RocksDB native metrics are available but disabled by default, you can fi <table class="table table-bordered"> <thead> <tr> - <th class="text-left" style="width: 18%">Scope</th> - <th class="text-left" style="width: 26%">Metrics</th> - <th class="text-left" style="width: 48%">Description</th> - <th class="text-left" style="width: 8%">Type</th> + <th class="text-left" style="width: 15%">Scope</th> + <th class="text-left" style="width: 15%">Infix</th> + <th class="text-left" style="width: 15%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + <th class="text-left" style="width: 5%">Type</th> </tr> </thead> <tbody> <tr> <th rowspan="4"><strong>Task/Operator</strong></th> - <td>forst.fileCache.hit</td> + <td rowspan="4">forst.fileCache</td> + <td>hit</td> <td>The hit count of ForSt state backend cache.</td> <td>Counter</td> </tr> <tr> - <td>forst.fileCache.miss</td> + <td>miss</td> <td>The miss count of ForSt state backend cache.</td> <td>Counter</td> </tr> <tr> - <td>forst.fileCache.usedBytes</td> + <td>usedBytes</td> <td>The bytes cached in ForSt state backend cache.</td> <td>Gauge</td> </tr> <tr> - <td>forst.fileCache.remainingBytes</td> + <td>remainingBytes</td> <td>The remaining space in the volume for the configured cache. Only available when 'state.backend.forst.cache.reserve-size' is set above 0. </td> <td>Gauge</td> </tr> @@ -2230,6 +2232,44 @@ Metrics below can be used to measure the effectiveness of speculative execution. </tbody> </table> +### Async State Processing + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 15%">Scope</th> + <th class="text-left" style="width: 10%">Infix</th> + <th class="text-left" style="width: 20%">Metrics</th> + <th class="text-left" style="width: 50%">Description</th> + <th class="text-left" style="width: 5%">Type</th> + </tr> + </thead> + <tbody> + <tr> + <th rowspan="4"><strong>Operator</strong></th> + <td rowspan="4">asyncStateProcessing</td> + <td>numInFlightRecords</td> + <td>The number of in-flight records in the async execution controller's buffers.</td> + <td>Gauge</td> + </tr> + <tr> + <td>activeBufferSize</td> + <td>The number of records which are pending to be processed.</td> + <td>Gauge</td> + </tr> + <tr> + <td>blockingBufferSize</td> + <td>The number of records which are blocked by the ongoing records.</td> + <td>Gauge</td> + </tr> + <tr> + <td>numBlockingKeys</td> + <td>The number of different keys are blocked in async execution controller.</td> + <td>Gauge</td> + </tr> + </tbody> +</table> + ## End-to-End latency tracking Flink allows to track the latency of records travelling through the system. This feature is disabled by default. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index cc7eed91d83..4fdefc64d47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.v2.State; import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -158,7 +159,8 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab int batchSize, long bufferTimeout, int maxInFlightRecords, - SwitchContextListener<K> switchContextListener) { + @Nullable SwitchContextListener<K> switchContextListener, + @Nullable MetricGroup metricGroup) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.exceptionHandler = exceptionHandler; @@ -187,6 +189,12 @@ public class AsyncExecutionController<K> implements StateRequestHandler, Closeab this.epochManager = new EpochManager(this); this.switchContextListener = switchContextListener; + if (metricGroup != null) { + metricGroup.gauge("numInFlightRecords", this::getInFlightRecordNum); + metricGroup.gauge("activeBufferSize", () -> stateRequestsBuffer.activeQueueSize()); + metricGroup.gauge("blockingBufferSize", () -> stateRequestsBuffer.blockingQueueSize()); + metricGroup.gauge("numBlockingKeys", () -> stateRequestsBuffer.blockingKeyNum()); + } LOG.info( "Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}, epochParallelMode {}", this.batchSize, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java index fe469d90012..b5946be54d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java @@ -218,6 +218,15 @@ public class StateRequestBuffer<K> implements Closeable { return blockingQueueSize; } + /** + * Get the number of different keys in blocking queue. + * + * @return the number of different keys in blocking queue. + */ + int blockingKeyNum() { + return blockingQueue.size(); + } + /** * Get the number of state requests of active queue in constant-time. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 3ce835c8f28..32f88ff7759 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -121,7 +121,8 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit, - asyncKeyedStateBackend); + asyncKeyedStateBackend, + getMetricGroup().addGroup("asyncStateProcessing")); asyncKeyedStateBackend.setup(asyncExecutionController); if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) { LOG.warn( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java index b1a80cebaa0..955faa770b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java @@ -118,7 +118,8 @@ public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit, - asyncKeyedStateBackend); + asyncKeyedStateBackend, + getMetricGroup().addGroup("asyncStateProcessing")); asyncKeyedStateBackend.setup(asyncExecutionController); if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) { LOG.warn( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java index 68ea31ca0ea..9ec5c3769e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java @@ -56,6 +56,7 @@ public class AbstractStateIteratorTest { 100, 1000, 1, + null, null); stateExecutor.bindAec(aec); RecordContext<String> recordContext = aec.buildContext("1", "key1"); @@ -95,6 +96,7 @@ public class AbstractStateIteratorTest { 100, 1000, 1, + null, null); stateExecutor.bindAec(aec); RecordContext<String> recordContext = aec.buildContext("1", "key1"); @@ -141,6 +143,7 @@ public class AbstractStateIteratorTest { 100, 1000, 1, + null, null); stateExecutor.bindAec(aec); RecordContext<String> recordContext = aec.buildContext("1", "key1"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index cf4ed16b1fe..e77e3c522de 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -28,6 +28,9 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.state.InternalStateFuture; import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler; import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch; import org.apache.flink.runtime.asyncprocessing.EpochManager.ParallelMode; import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager; @@ -46,6 +49,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; @@ -73,6 +77,7 @@ class AsyncExecutionControllerTest { }) .thenAccept(val -> output.set(val)); }; + final Map<String, Gauge> registeredGauges = new HashMap<>(); void setup( int batchSize, @@ -98,6 +103,23 @@ class AsyncExecutionControllerTest { } closeableRegistry.registerCloseable(asyncKeyedStateBackend); closeableRegistry.registerCloseable(asyncKeyedStateBackend::dispose); + + UnregisteredMetricsGroup metricsGroup = + new UnregisteredMetricsGroup() { + String prefix = ""; + + @Override + public <T, G extends Gauge<T>> G gauge(String name, G gauge) { + registeredGauges.put(prefix + "." + name, gauge); + return gauge; + } + + @Override + public MetricGroup addGroup(String name) { + prefix = name; + return this; + } + }; aec = new AsyncExecutionController<>( mailboxExecutor, @@ -108,7 +130,8 @@ class AsyncExecutionControllerTest { batchSize, timeout, maxInFlight, - null); + null, + metricsGroup.addGroup("asyncStateProcessing")); asyncKeyedStateBackend.setup(aec); try { @@ -149,6 +172,15 @@ class AsyncExecutionControllerTest { assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); assertThat(aec.inFlightRecordNum.get()).isEqualTo(1); + assertThat(registeredGauges.get("asyncStateProcessing.numInFlightRecords").getValue()) + .isEqualTo(1); + assertThat(registeredGauges.get("asyncStateProcessing.activeBufferSize").getValue()) + .isEqualTo(1); + assertThat(registeredGauges.get("asyncStateProcessing.blockingBufferSize").getValue()) + .isEqualTo(0); + assertThat(registeredGauges.get("asyncStateProcessing.numBlockingKeys").getValue()) + .isEqualTo(0); + aec.triggerIfNeeded(true); // After running, the value update is in active buffer. assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); @@ -189,6 +221,14 @@ class AsyncExecutionControllerTest { assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1); assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); assertThat(aec.inFlightRecordNum.get()).isEqualTo(2); + assertThat(registeredGauges.get("asyncStateProcessing.numInFlightRecords").getValue()) + .isEqualTo(2); + assertThat(registeredGauges.get("asyncStateProcessing.activeBufferSize").getValue()) + .isEqualTo(1); + assertThat(registeredGauges.get("asyncStateProcessing.blockingBufferSize").getValue()) + .isEqualTo(1); + assertThat(registeredGauges.get("asyncStateProcessing.numBlockingKeys").getValue()) + .isEqualTo(1); aec.triggerIfNeeded(true); // Value update for record2 finishes. The value get for record3 is migrated from blocking // buffer to active buffer actively. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java index 705a2c50d16..0522b88c05d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractAggregatingStateTest.java @@ -116,6 +116,7 @@ class AbstractAggregatingStateTest extends AbstractKeyedStateTestBase { 100, 10000, 1, + null, null); AbstractAggregatingState<String, String, Integer, Integer, Integer> aggregatingState = new AbstractAggregatingState<>(aec, descriptor); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java index 00ca94d6041..5353c8b73c0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractKeyedStateTestBase.java @@ -86,6 +86,7 @@ public class AbstractKeyedStateTestBase { 1, 1000, 1, + null, null); exception = new AtomicReference<>(null); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java index b6a3eb7144b..cac5f8631b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/AbstractReducingStateTest.java @@ -91,6 +91,7 @@ public class AbstractReducingStateTest extends AbstractKeyedStateTestBase { 100, 10000, 1, + null, null); AbstractReducingState<String, String, Integer> reducingState = new AbstractReducingState<>(aec, descriptor); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java index a1a3165ba82..8f7ec470846 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/v2/StateBackendTestV2Base.java @@ -234,6 +234,7 @@ public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> { aecBatchSize, aecBufferTimeout, aecMaxInFlightRecords, + null, null); backend.setup(aec); @@ -324,6 +325,7 @@ public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> { aecBatchSize, aecBufferTimeout, aecMaxInFlightRecords, + null, null); backend.setup(aec); @@ -410,6 +412,7 @@ public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> { aecBatchSize, aecBufferTimeout, aecMaxInFlightRecords, + null, null); backend.setup(aec); @@ -480,6 +483,7 @@ public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> { aecBatchSize, aecBufferTimeout, aecMaxInFlightRecords, + null, null); backend.setup(aec); @@ -585,6 +589,7 @@ public abstract class StateBackendTestV2Base<B extends AbstractStateBackend> { 1, -1, 1, + null, null); backend.setup(aec); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java index d53197b5547..c05ad4b41da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.java @@ -79,6 +79,7 @@ class InternalTimerServiceAsyncImplTest { 2, 1000L, 10, + null, null); // ensure arbitrary key is in the key group int totalKeyGroups = 128; diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java index fe6bb88eb85..ed93bd81532 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateTestBase.java @@ -82,6 +82,7 @@ public class ForStStateTestBase { 100, 0, 1, + null, null); keyedBackend.setup(aec); }