This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 6142e5ea [runtime] Add comprehensive tests for ActionExecutionOperator 
manager classes  (#659)
6142e5ea is described below

commit 6142e5ea8c91e0535b75fa40725993b4ba7978b0
Author: Weiqing Yang <[email protected]>
AuthorDate: Thu May 14 00:57:14 2026 -0700

    [runtime] Add comprehensive tests for ActionExecutionOperator manager 
classes  (#659)
    
    * [runtime] Add @VisibleForTesting seams for manager tests
    
    * [runtime] Add comprehensive tests for DurableExecutionManager
    
    * [runtime] Add comprehensive tests for EventRouter
    
    * [runtime] Add comprehensive tests for ActionTaskContextManager
---
 .../runtime/operator/DurableExecutionManager.java  |   5 +
 .../flink/agents/runtime/operator/EventRouter.java |  14 +-
 .../operator/ActionTaskContextManagerTest.java     | 179 +++++++++++++++++++++
 .../operator/DurableExecutionManagerTest.java      | 119 ++++++++++++++
 .../agents/runtime/operator/EventRouterTest.java   | 149 +++++++++++++++++
 5 files changed, 464 insertions(+), 2 deletions(-)

diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
index f8d1129a..03bfe264 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
@@ -416,6 +416,11 @@ class DurableExecutionManager implements 
ActionStatePersister, AutoCloseable {
         return actionStateStore;
     }
 
+    @VisibleForTesting
+    Map<Long, Map<Object, Long>> getCheckpointIdToSeqNums() {
+        return checkpointIdToSeqNums;
+    }
+
     @Override
     public void close() throws Exception {
         if (actionStateStore != null) {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
index 096ac20d..1283c4c6 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
@@ -88,8 +88,13 @@ class EventRouter<IN, OUT> implements AutoCloseable {
     private BuiltInMetrics builtInMetrics;
 
     EventRouter(AgentPlan agentPlan, boolean inputIsJava) {
+        this(agentPlan, inputIsJava, createEventLogger(agentPlan));
+    }
+
+    @VisibleForTesting
+    EventRouter(AgentPlan agentPlan, boolean inputIsJava, EventLogger 
eventLogger) {
         this.inputIsJava = inputIsJava;
-        this.eventLogger = createEventLogger(agentPlan);
+        this.eventLogger = eventLogger;
         this.eventListeners = new ArrayList<>();
     }
 
@@ -242,7 +247,12 @@ class EventRouter<IN, OUT> implements AutoCloseable {
         return eventLogger;
     }
 
-    private EventLogger createEventLogger(AgentPlan agentPlan) {
+    @VisibleForTesting
+    void addEventListener(EventListener listener) {
+        eventListeners.add(listener);
+    }
+
+    private static EventLogger createEventLogger(AgentPlan agentPlan) {
         // Honor the EVENT_LOGGER_TYPE config, defaulting to SLF4J so events 
surface in the Flink
         // Web UI by default. An explicit baseLogDir forces the file logger 
for backward
         // compatibility with the existing file-based logging path.
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java
index bfcead06..8fec7cb6 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java
@@ -18,12 +18,29 @@
 package org.apache.flink.agents.runtime.operator;
 
 import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.actions.Action;
+import org.apache.flink.agents.runtime.ResourceCache;
+import org.apache.flink.agents.runtime.actionstate.ActionState;
+import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
 import org.apache.flink.agents.runtime.async.ContinuationContext;
+import org.apache.flink.agents.runtime.context.JavaRunnerContextImpl;
+import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
+import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
+import org.apache.flink.api.common.state.MapState;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 /** Contract tests for {@link ActionTaskContextManager}. */
 class ActionTaskContextManagerTest {
@@ -73,4 +90,166 @@ class ActionTaskContextManagerTest {
                     .hasMessageContaining("PythonRunnerContextImpl has not 
been initialized");
         }
     }
+
+    @Test
+    void createAndSetRunnerContextBuildsFreshMemoryContextOnFirstCall() throws 
Exception {
+        try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
+            ActionTask t = new JavaActionTask("k", new InputEvent(1L), 
TestActions.noopAction());
+            invokeCreateAndSetRunnerContext(mgr, t);
+
+            // Production path: createAndSetRunnerContext at 
ActionTaskContextManager.java:210-218
+            // — the else branch builds a fresh MemoryContext when the map has 
no entry.
+            
assertThat(t.getRunnerContext()).isInstanceOf(JavaRunnerContextImpl.class);
+            assertThat(t.getRunnerContext().getMemoryContext()).isNotNull();
+        }
+    }
+
+    @Test
+    void createAndSetRunnerContextReusesExistingMemoryContext() throws 
Exception {
+        try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
+            Action action = TestActions.noopAction();
+            ActionTask from = new JavaActionTask("k", new InputEvent(1L), 
action);
+            ActionTask to = new JavaActionTask("k", new InputEvent(2L), 
action);
+
+            // Step 1: createAndSetRunnerContext(from) — runner context now 
carries a fresh
+            // MemoryContext, but the map (actionTaskMemoryContexts) is still 
empty (production
+            // code at lines 210-218 only reads from the map, never writes).
+            invokeCreateAndSetRunnerContext(mgr, from);
+            RunnerContextImpl.MemoryContext fromMemCtx = 
from.getRunnerContext().getMemoryContext();
+            assertThat(fromMemCtx).isNotNull();
+
+            // Step 2: transferContexts populates the map entry for `to` via 
the private
+            // putMemoryContext (ActionTaskContextManager.java:266-286). DEM 
null is OK because
+            // from has no DurableExecutionContext.
+            mgr.transferContexts(from, to, new DurableExecutionManager(null));
+
+            // Step 3: createAndSetRunnerContext(to) — production code at 
lines 211-212 reads
+            // the map for `to` and reuses fromMemCtx (the if-branch of the 
reuse check).
+            invokeCreateAndSetRunnerContext(mgr, to);
+
+            // The runner context is shared (single Java JavaRunnerContextImpl 
instance), but
+            // its memoryContext was switched to the entry that was in the map 
for `to`. Verify
+            // the same MemoryContext instance is now wired on the runner 
context.
+            
assertThat(to.getRunnerContext().getMemoryContext()).isSameAs(fromMemCtx);
+        }
+    }
+
+    @Test
+    void transferContextsCopiesMemoryAndContinuationToNewTask() throws 
Exception {
+        try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
+            Action action = TestActions.noopAction();
+            ActionTask from = new JavaActionTask("k", new InputEvent(1L), 
action);
+            ActionTask to = new JavaActionTask("k", new InputEvent(2L), 
action);
+
+            // Populate `from`'s runner context with a MemoryContext and 
ContinuationContext.
+            invokeCreateAndSetRunnerContext(mgr, from);
+            RunnerContextImpl.MemoryContext fromMemCtx = 
from.getRunnerContext().getMemoryContext();
+            assertThat(fromMemCtx).isNotNull();
+
+            // transferContexts (ActionTaskContextManager.java:266-286) copies 
but does NOT
+            // remove from source. The from-side continuation map is never 
populated (the
+            // continuation lives on from's runner context until transfer 
copies it over for
+            // `to`). Operator-side cleanup of `from`'s entries is the 
operator's
+            // responsibility — see ActionExecutionOperator.java:366-369.
+            mgr.transferContexts(from, to, new DurableExecutionManager(null));
+
+            // (a) The memory context entry for `to` is the same instance 
fromTask holds.
+            RunnerContextImpl.MemoryContext toMemCtx = 
mgr.removeMemoryContext(to);
+            assertThat(toMemCtx).isSameAs(fromMemCtx);
+
+            // After remove, the map no longer has `to`'s entry.
+            assertThat(mgr.removeMemoryContext(to)).isNull();
+
+            // (b) Continuation context routed to `to`.
+            assertThat(mgr.hasContinuationContext(to)).isTrue();
+
+            // (c) The `from`-side continuation map entry was never populated 
by the transfer
+            // — the source carries its continuation on its runner context, 
not on the
+            // manager's map.
+            assertThat(mgr.hasContinuationContext(from)).isFalse();
+        }
+    }
+
+    @Test
+    void transferContextsRoutesDurableContextThroughManager() throws Exception 
{
+        try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
+            Action action = TestActions.noopAction();
+            InputEvent event = new InputEvent(1L);
+            ActionTask from = new JavaActionTask("k", event, action);
+            ActionTask to = new JavaActionTask("k", new InputEvent(2L), 
action);
+
+            invokeCreateAndSetRunnerContext(mgr, from);
+
+            // Spy on DEM backed by a real InMemoryActionStateStore so spied 
internals don't
+            // NPE. The store doesn't really need to be exercised — we only 
verify the
+            // putDurableContext call site at 
ActionTaskContextManager.java:271-273.
+            DurableExecutionManager spyDem =
+                    spy(new DurableExecutionManager(new 
InMemoryActionStateStore(false)));
+
+            // Attach a DurableExecutionContext to `from`'s runner context. 
The persister is
+            // the DEM itself (DurableExecutionManager implements 
ActionStatePersister at
+            // DurableExecutionManager.java:78). ActionState ctor needs an 
Event so getCallResults()
+            // returns a non-null list inside the DurableExecutionContext ctor.
+            ActionState actionState = new ActionState(event);
+            RunnerContextImpl.DurableExecutionContext durableCtx =
+                    new RunnerContextImpl.DurableExecutionContext(
+                            "k", 0L, action, event, actionState, spyDem);
+            from.getRunnerContext().setDurableExecutionContext(durableCtx);
+
+            mgr.transferContexts(from, to, spyDem);
+
+            // The durable-context branch routes via the DEM's 
putDurableContext, satisfying
+            // the no-manager-to-manager-references design constraint (DEM 
passed as a
+            // parameter, not held as a field).
+            verify(spyDem)
+                    .putDurableContext(
+                            eq(to), 
any(RunnerContextImpl.DurableExecutionContext.class));
+        }
+    }
+
+    @Test
+    void closeIsIdempotent() throws Exception {
+        // Not using try-with-resources here because we want to call close() 
explicitly twice.
+        ActionTaskContextManager mgr = new ActionTaskContextManager(1);
+        ActionTask t = new JavaActionTask("k", new InputEvent(1L), 
TestActions.noopAction());
+        invokeCreateAndSetRunnerContext(mgr, t);
+
+        // First close() shuts down the runner context and the continuation 
executor
+        // (ActionTaskContextManager.java:319-330). The second close() must be 
a no-op:
+        // runnerContext is nulled and ContinuationActionExecutor.close() is 
backed by
+        // ExecutorService.shutdownNow() which is itself idempotent.
+        mgr.close();
+        mgr.close();
+    }
+
+    /**
+     * Shared helper: install a runner context on {@code task} using mocked 
collaborators. Used by
+     * tests that need a fully wired runner context but do not care about the 
collaborator details.
+     */
+    @SuppressWarnings("unchecked")
+    private static void invokeCreateAndSetRunnerContext(
+            ActionTaskContextManager mgr, ActionTask task) {
+        AgentPlan plan = newEmptyAgentPlan();
+        ResourceCache cache = mock(ResourceCache.class);
+        FlinkAgentsMetricGroupImpl metricGroup =
+                mock(FlinkAgentsMetricGroupImpl.class, RETURNS_DEEP_STUBS);
+        MapState<String, MemoryObjectImpl.MemoryItem> sensoryMem = 
mock(MapState.class);
+        MapState<String, MemoryObjectImpl.MemoryItem> shortTermMem = 
mock(MapState.class);
+        mgr.createAndSetRunnerContext(
+                task,
+                "k",
+                plan,
+                cache,
+                metricGroup,
+                "job",
+                () -> {},
+                sensoryMem,
+                shortTermMem,
+                /* pythonRunnerContext */ null,
+                /* longTermMemory */ null);
+    }
+
+    private static AgentPlan newEmptyAgentPlan() {
+        return new AgentPlan(new HashMap<>(), new HashMap<>());
+    }
 }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
index 98a8b409..6c737d47 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
@@ -25,12 +25,23 @@ import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.actionstate.ActionState;
 import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
 import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -96,4 +107,112 @@ class DurableExecutionManagerTest {
 
         dem.close();
     }
+
+    /**
+     * Verifies {@code snapshotLastCompletedSequenceNumbers} captures the 
per-key sequence numbers
+     * returned by the keyed state backend and records them under the given 
checkpoint id.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    void snapshotCapturesKeySequenceNumbers() throws Exception {
+        InMemoryActionStateStore store = new InMemoryActionStateStore(false);
+        DurableExecutionManager dem = new DurableExecutionManager(store);
+
+        KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
+        doAnswer(
+                        invocation -> {
+                            KeyedStateFunction<Object, ValueState<Long>> 
function =
+                                    invocation.getArgument(3);
+                            ValueState<Long> state1 = mock(ValueState.class);
+                            when(state1.value()).thenReturn(10L);
+                            ValueState<Long> state2 = mock(ValueState.class);
+                            when(state2.value()).thenReturn(20L);
+                            function.process("k1", state1);
+                            function.process("k2", state2);
+                            return null;
+                        })
+                .when(backend)
+                .applyToAllKeys(any(), any(), any(), any());
+
+        dem.snapshotLastCompletedSequenceNumbers(backend, 1000L);
+
+        assertThat(dem.getCheckpointIdToSeqNums())
+                .containsOnlyKeys(1000L)
+                .extractingByKey(1000L)
+                .asInstanceOf(
+                        org.assertj.core.api.InstanceOfAssertFactories.map(
+                                Object.class, Long.class))
+                .containsEntry("k1", 10L)
+                .containsEntry("k2", 20L)
+                .hasSize(2);
+
+        dem.close();
+    }
+
+    /**
+     * Verifies {@code notifyCheckpointComplete} prunes every captured key for 
the notified
+     * checkpoint, leaves entries captured for other checkpoints intact, and 
removes the notified
+     * checkpoint's map entry. Combines the multi-key (loop body) and 
multi-checkpoint (selectivity)
+     * concerns into one stronger test.
+     */
+    @Test
+    void notifyPrunesNotifiedCheckpointAndPreservesOthers() throws Exception {
+        InMemoryActionStateStore store = new InMemoryActionStateStore(true);
+        DurableExecutionManager dem = new DurableExecutionManager(store);
+
+        Action action = TestActions.noopAction();
+        // Seed the store with state for three keys.
+        dem.maybeInitActionState("k1", 10L, action, new InputEvent(1L));
+        dem.maybeInitActionState("k2", 20L, action, new InputEvent(2L));
+        dem.maybeInitActionState("k3", 30L, action, new InputEvent(3L));
+        assertThat(store.getKeyedActionStates()).containsKeys("k1", "k2", 
"k3");
+
+        // Record captured sequence numbers: checkpoint 1000 covers {k1, k2}, 
checkpoint 1001
+        // covers {k3}. Use the @VisibleForTesting seam to install the 
snapshot directly.
+        Map<Object, Long> ckpt1000 = new HashMap<>();
+        ckpt1000.put("k1", 10L);
+        ckpt1000.put("k2", 20L);
+        Map<Object, Long> ckpt1001 = new HashMap<>();
+        ckpt1001.put("k3", 30L);
+        dem.getCheckpointIdToSeqNums().put(1000L, ckpt1000);
+        dem.getCheckpointIdToSeqNums().put(1001L, ckpt1001);
+
+        dem.notifyCheckpointComplete(1000L);
+
+        // InMemoryActionStateStore.pruneState (lines 67–71) ignores the 
seqNum and removes the
+        // whole key. The notified checkpoint's keys (k1, k2) are pruned; k3 — 
captured under a
+        // different checkpoint — must remain. The notified checkpoint's map 
entry is removed;
+        // entries for other checkpoints stay.
+        assertThat(store.getKeyedActionStates())
+                .doesNotContainKey("k1")
+                .doesNotContainKey("k2")
+                .containsKey("k3");
+        
assertThat(dem.getCheckpointIdToSeqNums()).doesNotContainKey(1000L).containsKey(1001L);
+
+        dem.close();
+    }
+
+    /**
+     * Verifies {@code handleRecovery} reads markers from the operator state 
backend and forwards
+     * them to {@code store.rebuildState(...)}.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    void handleRecoveryCallsRebuildState() throws Exception {
+        InMemoryActionStateStore spyStore = spy(new 
InMemoryActionStateStore(true));
+        DurableExecutionManager dem = new DurableExecutionManager(spyStore);
+
+        OperatorStateBackend opBackend = mock(OperatorStateBackend.class);
+        ListState<Object> markerState = mock(ListState.class);
+        
when(opBackend.getUnionListState(any(ListStateDescriptor.class))).thenReturn(markerState);
+        when(markerState.get()).thenReturn(List.of("test-marker"));
+
+        dem.handleRecovery(opBackend);
+
+        // InMemoryActionStateStore.rebuildState is a no-op (lines 62–64), so 
state mutation is
+        // not observable here — the test verifies the call contract only.
+        verify(spyStore).rebuildState(List.of("test-marker"));
+
+        dem.close();
+    }
 }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java
index 9338a291..8f3cbea9 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java
@@ -18,16 +18,32 @@
 package org.apache.flink.agents.runtime.operator;
 
 import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.EventContext;
 import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.listener.EventListener;
+import org.apache.flink.agents.api.logger.EventLogger;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.actions.Action;
+import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
+import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
+import org.apache.flink.agents.runtime.operator.queue.SegmentedQueue;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 /** Contract tests for {@link EventRouter}. */
 class EventRouterTest {
@@ -56,4 +72,137 @@ class EventRouterTest {
 
         assertThat(triggered).containsExactly(action);
     }
+
+    /**
+     * Verifies {@code notifyEventProcessed} dispatches {@code 
onEventProcessed} to every registered
+     * {@link EventListener} with the same {@link EventContext} and event 
payload.
+     */
+    @Test
+    void notifyEventProcessedNotifiesAllListeners() throws Exception {
+        AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+        EventLogger mockLogger = mock(EventLogger.class);
+        EventRouter<Long, Object> router =
+                new EventRouter<>(plan, /* inputIsJava */ true, mockLogger);
+
+        EventListener listener1 = mock(EventListener.class);
+        EventListener listener2 = mock(EventListener.class);
+        router.addEventListener(listener1);
+        router.addEventListener(listener2);
+
+        router.open(makeMetrics());
+
+        InputEvent inputEvent = new InputEvent(7L);
+        router.notifyEventProcessed(inputEvent);
+
+        verify(listener1).onEventProcessed(any(EventContext.class), 
eq(inputEvent));
+        verify(listener2).onEventProcessed(any(EventContext.class), 
eq(inputEvent));
+    }
+
+    /**
+     * Verifies {@code notifyEventProcessed} increments the {@code 
numOfEventProcessed} metric by
+     * delegating to {@link BuiltInMetrics#markEventProcessed()}.
+     */
+    @Test
+    void notifyEventProcessedIncrementsMetric() throws Exception {
+        AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+        EventLogger mockLogger = mock(EventLogger.class);
+        EventRouter<Long, Object> router =
+                new EventRouter<>(plan, /* inputIsJava */ true, mockLogger);
+
+        BuiltInMetrics spyMetrics = spy(makeMetrics());
+        router.open(spyMetrics);
+
+        router.notifyEventProcessed(new InputEvent(1L));
+
+        verify(spyMetrics).markEventProcessed();
+    }
+
+    /**
+     * Verifies {@code notifyEventProcessed} calls {@code append} on the event 
logger followed by
+     * {@code flush}, in that order.
+     */
+    @Test
+    void notifyEventProcessedAppendsAndFlushesLogger() throws Exception {
+        AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+        EventLogger mockLogger = mock(EventLogger.class);
+        EventRouter<Long, Object> router =
+                new EventRouter<>(plan, /* inputIsJava */ true, mockLogger);
+
+        router.open(makeMetrics());
+
+        InputEvent inputEvent = new InputEvent(3L);
+        router.notifyEventProcessed(inputEvent);
+
+        InOrder ordered = inOrder(mockLogger);
+        ordered.verify(mockLogger).append(any(EventContext.class), 
eq(inputEvent));
+        ordered.verify(mockLogger).flush();
+    }
+
+    /**
+     * Verifies watermarks are drained in arrival order — even when the keys 
ahead of them finish
+     * out-of-order. The {@link SegmentedQueue} closes a segment when a 
watermark is added, so the
+     * sequence {@code addKey(k1), addKey(k2), addWatermark(WM1), addKey(k3), 
addWatermark(WM2)}
+     * forms two segments: {@code [{k1,k2} | WM1]} then {@code [{k3} | WM2]}. 
Even if {@code k2}
+     * finishes before {@code k1} and {@code k3} finishes before either, the 
emitted order must be
+     * {@code [WM1, WM2]}.
+     */
+    @Test
+    void processEligibleWatermarksDrainsInArrivalOrder() throws Exception {
+        AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+        EventRouter<Long, Object> router = new EventRouter<>(plan, /* 
inputIsJava */ true);
+        router.open(makeMetrics());
+
+        SegmentedQueue queue = router.getKeySegmentQueue();
+        Watermark wm1 = new Watermark(100L);
+        Watermark wm2 = new Watermark(200L);
+
+        queue.addKeyToLastSegment("k1");
+        queue.addKeyToLastSegment("k2");
+        queue.addWatermark(wm1);
+        queue.addKeyToLastSegment("k3");
+        queue.addWatermark(wm2);
+
+        // Finish keys out of arrival order.
+        queue.removeKey("k2");
+        queue.removeKey("k3");
+        queue.removeKey("k1");
+
+        List<Watermark> emitted = new ArrayList<>();
+        router.processEligibleWatermarks(emitted::add);
+
+        assertThat(emitted).containsExactly(wm1, wm2);
+    }
+
+    /**
+     * Verifies a watermark stays buffered while its preceding segment still 
has an in-flight key,
+     * and is emitted only after the segment drains.
+     */
+    @Test
+    void processEligibleWatermarksBlocksWhileSegmentHasKeys() throws Exception 
{
+        AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+        EventRouter<Long, Object> router = new EventRouter<>(plan, /* 
inputIsJava */ true);
+        router.open(makeMetrics());
+
+        SegmentedQueue queue = router.getKeySegmentQueue();
+        Watermark wm1 = new Watermark(100L);
+
+        queue.addKeyToLastSegment("k1");
+        queue.addWatermark(wm1);
+
+        List<Watermark> emitted = new ArrayList<>();
+        router.processEligibleWatermarks(emitted::add);
+        // k1 is still in flight → wm1 must be held back.
+        assertThat(emitted).isEmpty();
+
+        queue.removeKey("k1");
+        router.processEligibleWatermarks(emitted::add);
+        assertThat(emitted).containsExactly(wm1);
+    }
+
+    private static BuiltInMetrics makeMetrics() {
+        FlinkAgentsMetricGroupImpl metricGroup =
+                mock(FlinkAgentsMetricGroupImpl.class, RETURNS_DEEP_STUBS);
+        AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+        return new BuiltInMetrics(metricGroup, plan);
+    }
 }

Reply via email to