This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 6142e5ea [runtime] Add comprehensive tests for ActionExecutionOperator
manager classes (#659)
6142e5ea is described below
commit 6142e5ea8c91e0535b75fa40725993b4ba7978b0
Author: Weiqing Yang <[email protected]>
AuthorDate: Thu May 14 00:57:14 2026 -0700
[runtime] Add comprehensive tests for ActionExecutionOperator manager
classes (#659)
* [runtime] Add @VisibleForTesting seams for manager tests
* [runtime] Add comprehensive tests for DurableExecutionManager
* [runtime] Add comprehensive tests for EventRouter
* [runtime] Add comprehensive tests for ActionTaskContextManager
---
.../runtime/operator/DurableExecutionManager.java | 5 +
.../flink/agents/runtime/operator/EventRouter.java | 14 +-
.../operator/ActionTaskContextManagerTest.java | 179 +++++++++++++++++++++
.../operator/DurableExecutionManagerTest.java | 119 ++++++++++++++
.../agents/runtime/operator/EventRouterTest.java | 149 +++++++++++++++++
5 files changed, 464 insertions(+), 2 deletions(-)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
index f8d1129a..03bfe264 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
@@ -416,6 +416,11 @@ class DurableExecutionManager implements
ActionStatePersister, AutoCloseable {
return actionStateStore;
}
+ @VisibleForTesting
+ Map<Long, Map<Object, Long>> getCheckpointIdToSeqNums() {
+ return checkpointIdToSeqNums;
+ }
+
@Override
public void close() throws Exception {
if (actionStateStore != null) {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
index 096ac20d..1283c4c6 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
@@ -88,8 +88,13 @@ class EventRouter<IN, OUT> implements AutoCloseable {
private BuiltInMetrics builtInMetrics;
EventRouter(AgentPlan agentPlan, boolean inputIsJava) {
+ this(agentPlan, inputIsJava, createEventLogger(agentPlan));
+ }
+
+ @VisibleForTesting
+ EventRouter(AgentPlan agentPlan, boolean inputIsJava, EventLogger
eventLogger) {
this.inputIsJava = inputIsJava;
- this.eventLogger = createEventLogger(agentPlan);
+ this.eventLogger = eventLogger;
this.eventListeners = new ArrayList<>();
}
@@ -242,7 +247,12 @@ class EventRouter<IN, OUT> implements AutoCloseable {
return eventLogger;
}
- private EventLogger createEventLogger(AgentPlan agentPlan) {
+ @VisibleForTesting
+ void addEventListener(EventListener listener) {
+ eventListeners.add(listener);
+ }
+
+ private static EventLogger createEventLogger(AgentPlan agentPlan) {
// Honor the EVENT_LOGGER_TYPE config, defaulting to SLF4J so events
surface in the Flink
// Web UI by default. An explicit baseLogDir forces the file logger
for backward
// compatibility with the existing file-based logging path.
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java
index bfcead06..8fec7cb6 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionTaskContextManagerTest.java
@@ -18,12 +18,29 @@
package org.apache.flink.agents.runtime.operator;
import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.actions.Action;
+import org.apache.flink.agents.runtime.ResourceCache;
+import org.apache.flink.agents.runtime.actionstate.ActionState;
+import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
import org.apache.flink.agents.runtime.async.ContinuationContext;
+import org.apache.flink.agents.runtime.context.JavaRunnerContextImpl;
+import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
+import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
+import org.apache.flink.api.common.state.MapState;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
/** Contract tests for {@link ActionTaskContextManager}. */
class ActionTaskContextManagerTest {
@@ -73,4 +90,166 @@ class ActionTaskContextManagerTest {
.hasMessageContaining("PythonRunnerContextImpl has not
been initialized");
}
}
+
+ @Test
+ void createAndSetRunnerContextBuildsFreshMemoryContextOnFirstCall() throws
Exception {
+ try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
+ ActionTask t = new JavaActionTask("k", new InputEvent(1L),
TestActions.noopAction());
+ invokeCreateAndSetRunnerContext(mgr, t);
+
+ // Production path: createAndSetRunnerContext at
ActionTaskContextManager.java:210-218
+ // — the else branch builds a fresh MemoryContext when the map has
no entry.
+
assertThat(t.getRunnerContext()).isInstanceOf(JavaRunnerContextImpl.class);
+ assertThat(t.getRunnerContext().getMemoryContext()).isNotNull();
+ }
+ }
+
+ @Test
+ void createAndSetRunnerContextReusesExistingMemoryContext() throws
Exception {
+ try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
+ Action action = TestActions.noopAction();
+ ActionTask from = new JavaActionTask("k", new InputEvent(1L),
action);
+ ActionTask to = new JavaActionTask("k", new InputEvent(2L),
action);
+
+ // Step 1: createAndSetRunnerContext(from) — runner context now
carries a fresh
+ // MemoryContext, but the map (actionTaskMemoryContexts) is still
empty (production
+ // code at lines 210-218 only reads from the map, never writes).
+ invokeCreateAndSetRunnerContext(mgr, from);
+ RunnerContextImpl.MemoryContext fromMemCtx =
from.getRunnerContext().getMemoryContext();
+ assertThat(fromMemCtx).isNotNull();
+
+ // Step 2: transferContexts populates the map entry for `to` via
the private
+ // putMemoryContext (ActionTaskContextManager.java:266-286). DEM
null is OK because
+ // from has no DurableExecutionContext.
+ mgr.transferContexts(from, to, new DurableExecutionManager(null));
+
+ // Step 3: createAndSetRunnerContext(to) — production code at
lines 211-212 reads
+ // the map for `to` and reuses fromMemCtx (the if-branch of the
reuse check).
+ invokeCreateAndSetRunnerContext(mgr, to);
+
+ // The runner context is shared (single Java JavaRunnerContextImpl
instance), but
+ // its memoryContext was switched to the entry that was in the map
for `to`. Verify
+ // the same MemoryContext instance is now wired on the runner
context.
+
assertThat(to.getRunnerContext().getMemoryContext()).isSameAs(fromMemCtx);
+ }
+ }
+
+ @Test
+ void transferContextsCopiesMemoryAndContinuationToNewTask() throws
Exception {
+ try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
+ Action action = TestActions.noopAction();
+ ActionTask from = new JavaActionTask("k", new InputEvent(1L),
action);
+ ActionTask to = new JavaActionTask("k", new InputEvent(2L),
action);
+
+ // Populate `from`'s runner context with a MemoryContext and
ContinuationContext.
+ invokeCreateAndSetRunnerContext(mgr, from);
+ RunnerContextImpl.MemoryContext fromMemCtx =
from.getRunnerContext().getMemoryContext();
+ assertThat(fromMemCtx).isNotNull();
+
+ // transferContexts (ActionTaskContextManager.java:266-286) copies
but does NOT
+ // remove from source. The from-side continuation map is never
populated (the
+ // continuation lives on from's runner context until transfer
copies it over for
+ // `to`). Operator-side cleanup of `from`'s entries is the
operator's
+ // responsibility — see ActionExecutionOperator.java:366-369.
+ mgr.transferContexts(from, to, new DurableExecutionManager(null));
+
+ // (a) The memory context entry for `to` is the same instance
fromTask holds.
+ RunnerContextImpl.MemoryContext toMemCtx =
mgr.removeMemoryContext(to);
+ assertThat(toMemCtx).isSameAs(fromMemCtx);
+
+ // After remove, the map no longer has `to`'s entry.
+ assertThat(mgr.removeMemoryContext(to)).isNull();
+
+ // (b) Continuation context routed to `to`.
+ assertThat(mgr.hasContinuationContext(to)).isTrue();
+
+ // (c) The `from`-side continuation map entry was never populated
by the transfer
+ // — the source carries its continuation on its runner context,
not on the
+ // manager's map.
+ assertThat(mgr.hasContinuationContext(from)).isFalse();
+ }
+ }
+
+ @Test
+ void transferContextsRoutesDurableContextThroughManager() throws Exception
{
+ try (ActionTaskContextManager mgr = new ActionTaskContextManager(1)) {
+ Action action = TestActions.noopAction();
+ InputEvent event = new InputEvent(1L);
+ ActionTask from = new JavaActionTask("k", event, action);
+ ActionTask to = new JavaActionTask("k", new InputEvent(2L),
action);
+
+ invokeCreateAndSetRunnerContext(mgr, from);
+
+ // Spy on DEM backed by a real InMemoryActionStateStore so spied
internals don't
+ // NPE. The store doesn't really need to be exercised — we only
verify the
+ // putDurableContext call site at
ActionTaskContextManager.java:271-273.
+ DurableExecutionManager spyDem =
+ spy(new DurableExecutionManager(new
InMemoryActionStateStore(false)));
+
+ // Attach a DurableExecutionContext to `from`'s runner context.
The persister is
+ // the DEM itself (DurableExecutionManager implements
ActionStatePersister at
+ // DurableExecutionManager.java:78). ActionState ctor needs an
Event so getCallResults()
+ // returns a non-null list inside the DurableExecutionContext ctor.
+ ActionState actionState = new ActionState(event);
+ RunnerContextImpl.DurableExecutionContext durableCtx =
+ new RunnerContextImpl.DurableExecutionContext(
+ "k", 0L, action, event, actionState, spyDem);
+ from.getRunnerContext().setDurableExecutionContext(durableCtx);
+
+ mgr.transferContexts(from, to, spyDem);
+
+ // The durable-context branch routes via the DEM's
putDurableContext, satisfying
+ // the no-manager-to-manager-references design constraint (DEM
passed as a
+ // parameter, not held as a field).
+ verify(spyDem)
+ .putDurableContext(
+ eq(to),
any(RunnerContextImpl.DurableExecutionContext.class));
+ }
+ }
+
+ @Test
+ void closeIsIdempotent() throws Exception {
+ // Not using try-with-resources here because we want to call close()
explicitly twice.
+ ActionTaskContextManager mgr = new ActionTaskContextManager(1);
+ ActionTask t = new JavaActionTask("k", new InputEvent(1L),
TestActions.noopAction());
+ invokeCreateAndSetRunnerContext(mgr, t);
+
+ // First close() shuts down the runner context and the continuation
executor
+ // (ActionTaskContextManager.java:319-330). The second close() must be
a no-op:
+ // runnerContext is nulled and ContinuationActionExecutor.close() is
backed by
+ // ExecutorService.shutdownNow() which is itself idempotent.
+ mgr.close();
+ mgr.close();
+ }
+
+ /**
+ * Shared helper: install a runner context on {@code task} using mocked
collaborators. Used by
+ * tests that need a fully wired runner context but do not care about the
collaborator details.
+ */
+ @SuppressWarnings("unchecked")
+ private static void invokeCreateAndSetRunnerContext(
+ ActionTaskContextManager mgr, ActionTask task) {
+ AgentPlan plan = newEmptyAgentPlan();
+ ResourceCache cache = mock(ResourceCache.class);
+ FlinkAgentsMetricGroupImpl metricGroup =
+ mock(FlinkAgentsMetricGroupImpl.class, RETURNS_DEEP_STUBS);
+ MapState<String, MemoryObjectImpl.MemoryItem> sensoryMem =
mock(MapState.class);
+ MapState<String, MemoryObjectImpl.MemoryItem> shortTermMem =
mock(MapState.class);
+ mgr.createAndSetRunnerContext(
+ task,
+ "k",
+ plan,
+ cache,
+ metricGroup,
+ "job",
+ () -> {},
+ sensoryMem,
+ shortTermMem,
+ /* pythonRunnerContext */ null,
+ /* longTermMemory */ null);
+ }
+
+ private static AgentPlan newEmptyAgentPlan() {
+ return new AgentPlan(new HashMap<>(), new HashMap<>());
+ }
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
index 98a8b409..6c737d47 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
@@ -25,12 +25,23 @@ import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.actionstate.ActionState;
import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.OperatorStateBackend;
import org.junit.jupiter.api.Test;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -96,4 +107,112 @@ class DurableExecutionManagerTest {
dem.close();
}
+
+ /**
+ * Verifies {@code snapshotLastCompletedSequenceNumbers} captures the
per-key sequence numbers
+ * returned by the keyed state backend and records them under the given
checkpoint id.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ void snapshotCapturesKeySequenceNumbers() throws Exception {
+ InMemoryActionStateStore store = new InMemoryActionStateStore(false);
+ DurableExecutionManager dem = new DurableExecutionManager(store);
+
+ KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
+ doAnswer(
+ invocation -> {
+ KeyedStateFunction<Object, ValueState<Long>>
function =
+ invocation.getArgument(3);
+ ValueState<Long> state1 = mock(ValueState.class);
+ when(state1.value()).thenReturn(10L);
+ ValueState<Long> state2 = mock(ValueState.class);
+ when(state2.value()).thenReturn(20L);
+ function.process("k1", state1);
+ function.process("k2", state2);
+ return null;
+ })
+ .when(backend)
+ .applyToAllKeys(any(), any(), any(), any());
+
+ dem.snapshotLastCompletedSequenceNumbers(backend, 1000L);
+
+ assertThat(dem.getCheckpointIdToSeqNums())
+ .containsOnlyKeys(1000L)
+ .extractingByKey(1000L)
+ .asInstanceOf(
+ org.assertj.core.api.InstanceOfAssertFactories.map(
+ Object.class, Long.class))
+ .containsEntry("k1", 10L)
+ .containsEntry("k2", 20L)
+ .hasSize(2);
+
+ dem.close();
+ }
+
+ /**
+ * Verifies {@code notifyCheckpointComplete} prunes every captured key for
the notified
+ * checkpoint, leaves entries captured for other checkpoints intact, and
removes the notified
+ * checkpoint's map entry. Combines the multi-key (loop body) and
multi-checkpoint (selectivity)
+ * concerns into one stronger test.
+ */
+ @Test
+ void notifyPrunesNotifiedCheckpointAndPreservesOthers() throws Exception {
+ InMemoryActionStateStore store = new InMemoryActionStateStore(true);
+ DurableExecutionManager dem = new DurableExecutionManager(store);
+
+ Action action = TestActions.noopAction();
+ // Seed the store with state for three keys.
+ dem.maybeInitActionState("k1", 10L, action, new InputEvent(1L));
+ dem.maybeInitActionState("k2", 20L, action, new InputEvent(2L));
+ dem.maybeInitActionState("k3", 30L, action, new InputEvent(3L));
+ assertThat(store.getKeyedActionStates()).containsKeys("k1", "k2",
"k3");
+
+ // Record captured sequence numbers: checkpoint 1000 covers {k1, k2},
checkpoint 1001
+ // covers {k3}. Use the @VisibleForTesting seam to install the
snapshot directly.
+ Map<Object, Long> ckpt1000 = new HashMap<>();
+ ckpt1000.put("k1", 10L);
+ ckpt1000.put("k2", 20L);
+ Map<Object, Long> ckpt1001 = new HashMap<>();
+ ckpt1001.put("k3", 30L);
+ dem.getCheckpointIdToSeqNums().put(1000L, ckpt1000);
+ dem.getCheckpointIdToSeqNums().put(1001L, ckpt1001);
+
+ dem.notifyCheckpointComplete(1000L);
+
+ // InMemoryActionStateStore.pruneState (lines 67–71) ignores the
seqNum and removes the
+ // whole key. The notified checkpoint's keys (k1, k2) are pruned; k3 —
captured under a
+ // different checkpoint — must remain. The notified checkpoint's map
entry is removed;
+ // entries for other checkpoints stay.
+ assertThat(store.getKeyedActionStates())
+ .doesNotContainKey("k1")
+ .doesNotContainKey("k2")
+ .containsKey("k3");
+
assertThat(dem.getCheckpointIdToSeqNums()).doesNotContainKey(1000L).containsKey(1001L);
+
+ dem.close();
+ }
+
+ /**
+ * Verifies {@code handleRecovery} reads markers from the operator state
backend and forwards
+ * them to {@code store.rebuildState(...)}.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ void handleRecoveryCallsRebuildState() throws Exception {
+ InMemoryActionStateStore spyStore = spy(new
InMemoryActionStateStore(true));
+ DurableExecutionManager dem = new DurableExecutionManager(spyStore);
+
+ OperatorStateBackend opBackend = mock(OperatorStateBackend.class);
+ ListState<Object> markerState = mock(ListState.class);
+
when(opBackend.getUnionListState(any(ListStateDescriptor.class))).thenReturn(markerState);
+ when(markerState.get()).thenReturn(List.of("test-marker"));
+
+ dem.handleRecovery(opBackend);
+
+ // InMemoryActionStateStore.rebuildState is a no-op (lines 62–64), so
state mutation is
+ // not observable here — the test verifies the call contract only.
+ verify(spyStore).rebuildState(List.of("test-marker"));
+
+ dem.close();
+ }
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java
index 9338a291..8f3cbea9 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/EventRouterTest.java
@@ -18,16 +18,32 @@
package org.apache.flink.agents.runtime.operator;
import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.EventContext;
import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.listener.EventListener;
+import org.apache.flink.agents.api.logger.EventLogger;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.actions.Action;
+import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
+import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
+import org.apache.flink.agents.runtime.operator.queue.SegmentedQueue;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.jupiter.api.Test;
+import org.mockito.InOrder;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
/** Contract tests for {@link EventRouter}. */
class EventRouterTest {
@@ -56,4 +72,137 @@ class EventRouterTest {
assertThat(triggered).containsExactly(action);
}
+
+ /**
+ * Verifies {@code notifyEventProcessed} dispatches {@code
onEventProcessed} to every registered
+ * {@link EventListener} with the same {@link EventContext} and event
payload.
+ */
+ @Test
+ void notifyEventProcessedNotifiesAllListeners() throws Exception {
+ AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+ EventLogger mockLogger = mock(EventLogger.class);
+ EventRouter<Long, Object> router =
+ new EventRouter<>(plan, /* inputIsJava */ true, mockLogger);
+
+ EventListener listener1 = mock(EventListener.class);
+ EventListener listener2 = mock(EventListener.class);
+ router.addEventListener(listener1);
+ router.addEventListener(listener2);
+
+ router.open(makeMetrics());
+
+ InputEvent inputEvent = new InputEvent(7L);
+ router.notifyEventProcessed(inputEvent);
+
+ verify(listener1).onEventProcessed(any(EventContext.class),
eq(inputEvent));
+ verify(listener2).onEventProcessed(any(EventContext.class),
eq(inputEvent));
+ }
+
+ /**
+ * Verifies {@code notifyEventProcessed} increments the {@code
numOfEventProcessed} metric by
+ * delegating to {@link BuiltInMetrics#markEventProcessed()}.
+ */
+ @Test
+ void notifyEventProcessedIncrementsMetric() throws Exception {
+ AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+ EventLogger mockLogger = mock(EventLogger.class);
+ EventRouter<Long, Object> router =
+ new EventRouter<>(plan, /* inputIsJava */ true, mockLogger);
+
+ BuiltInMetrics spyMetrics = spy(makeMetrics());
+ router.open(spyMetrics);
+
+ router.notifyEventProcessed(new InputEvent(1L));
+
+ verify(spyMetrics).markEventProcessed();
+ }
+
+ /**
+ * Verifies {@code notifyEventProcessed} calls {@code append} on the event
logger followed by
+ * {@code flush}, in that order.
+ */
+ @Test
+ void notifyEventProcessedAppendsAndFlushesLogger() throws Exception {
+ AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+ EventLogger mockLogger = mock(EventLogger.class);
+ EventRouter<Long, Object> router =
+ new EventRouter<>(plan, /* inputIsJava */ true, mockLogger);
+
+ router.open(makeMetrics());
+
+ InputEvent inputEvent = new InputEvent(3L);
+ router.notifyEventProcessed(inputEvent);
+
+ InOrder ordered = inOrder(mockLogger);
+ ordered.verify(mockLogger).append(any(EventContext.class),
eq(inputEvent));
+ ordered.verify(mockLogger).flush();
+ }
+
+ /**
+ * Verifies watermarks are drained in arrival order — even when the keys
ahead of them finish
+ * out-of-order. The {@link SegmentedQueue} closes a segment when a
watermark is added, so the
+ * sequence {@code addKey(k1), addKey(k2), addWatermark(WM1), addKey(k3),
addWatermark(WM2)}
+ * forms two segments: {@code [{k1,k2} | WM1]} then {@code [{k3} | WM2]}.
Even if {@code k2}
+ * finishes before {@code k1} and {@code k3} finishes before either, the
emitted order must be
+ * {@code [WM1, WM2]}.
+ */
+ @Test
+ void processEligibleWatermarksDrainsInArrivalOrder() throws Exception {
+ AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+ EventRouter<Long, Object> router = new EventRouter<>(plan, /*
inputIsJava */ true);
+ router.open(makeMetrics());
+
+ SegmentedQueue queue = router.getKeySegmentQueue();
+ Watermark wm1 = new Watermark(100L);
+ Watermark wm2 = new Watermark(200L);
+
+ queue.addKeyToLastSegment("k1");
+ queue.addKeyToLastSegment("k2");
+ queue.addWatermark(wm1);
+ queue.addKeyToLastSegment("k3");
+ queue.addWatermark(wm2);
+
+ // Finish keys out of arrival order.
+ queue.removeKey("k2");
+ queue.removeKey("k3");
+ queue.removeKey("k1");
+
+ List<Watermark> emitted = new ArrayList<>();
+ router.processEligibleWatermarks(emitted::add);
+
+ assertThat(emitted).containsExactly(wm1, wm2);
+ }
+
+ /**
+ * Verifies a watermark stays buffered while its preceding segment still
has an in-flight key,
+ * and is emitted only after the segment drains.
+ */
+ @Test
+ void processEligibleWatermarksBlocksWhileSegmentHasKeys() throws Exception
{
+ AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+ EventRouter<Long, Object> router = new EventRouter<>(plan, /*
inputIsJava */ true);
+ router.open(makeMetrics());
+
+ SegmentedQueue queue = router.getKeySegmentQueue();
+ Watermark wm1 = new Watermark(100L);
+
+ queue.addKeyToLastSegment("k1");
+ queue.addWatermark(wm1);
+
+ List<Watermark> emitted = new ArrayList<>();
+ router.processEligibleWatermarks(emitted::add);
+ // k1 is still in flight → wm1 must be held back.
+ assertThat(emitted).isEmpty();
+
+ queue.removeKey("k1");
+ router.processEligibleWatermarks(emitted::add);
+ assertThat(emitted).containsExactly(wm1);
+ }
+
+ private static BuiltInMetrics makeMetrics() {
+ FlinkAgentsMetricGroupImpl metricGroup =
+ mock(FlinkAgentsMetricGroupImpl.class, RETURNS_DEEP_STUBS);
+ AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>());
+ return new BuiltInMetrics(metricGroup, plan);
+ }
}