This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 2cb4fb03 [Feature][Java] Add java event listeners config (#641)
2cb4fb03 is described below

commit 2cb4fb038465c2dea792da61f209b3ac80b01dcf
Author: twosom <[email protected]>
AuthorDate: Fri May 15 17:10:22 2026 +0900

    [Feature][Java] Add java event listeners config (#641)
    
    Co-authored-by: hope <[email protected]>
---
 .../api/configuration/AgentConfigOptions.java      |  7 +++
 .../flink/agents/api/listener/EventListener.java   | 29 ++++++------
 docs/content/docs/operations/configuration.md      |  4 ++
 .../runtime/operator/ActionExecutionOperator.java  |  3 ++
 .../flink/agents/runtime/operator/EventRouter.java | 31 +++++++++++++
 .../operator/ActionExecutionOperatorTest.java      | 51 ++++++++++++++++++++++
 6 files changed, 112 insertions(+), 13 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 608b7f78..c39997da 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -20,6 +20,8 @@ package org.apache.flink.agents.api.configuration;
 import org.apache.flink.agents.api.logger.EventLogLevel;
 import org.apache.flink.agents.api.logger.LoggerType;
 
+import java.util.List;
+
 /** The set of configuration options for agents parameters. */
 public class AgentConfigOptions {
 
@@ -135,4 +137,9 @@ public class AgentConfigOptions {
      */
     public static final ConfigOption<Integer> EVENT_LOG_MAX_DEPTH =
             new ConfigOption<>("event-log.standard.max-depth", Integer.class, 
5);
+
+    /** The config parameter specifies the list of event listener class names. 
*/
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public static final ConfigOption<List<String>> EVENT_LISTENERS =
+            (ConfigOption) new ConfigOption<>("event-listeners", List.class, 
null);
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/listener/EventListener.java 
b/api/src/main/java/org/apache/flink/agents/api/listener/EventListener.java
index 3668fcf6..6c5b9fb2 100644
--- a/api/src/main/java/org/apache/flink/agents/api/listener/EventListener.java
+++ b/api/src/main/java/org/apache/flink/agents/api/listener/EventListener.java
@@ -22,30 +22,33 @@ import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.EventContext;
 
 /**
- * Interface for event listeners that are notified when events are processed.
+ * Interface for event listeners that are notified when events are received 
for processing.
  *
- * <p>EventListener provides a callback mechanism triggered after event 
processing completes. This
- * is useful for monitoring, metrics collection, debugging, or triggering side 
effects based on
- * event processing.
+ * <p>EventListener provides a callback mechanism triggered at the beginning 
of event processing.
+ * This is useful for monitoring, metrics collection, debugging, or triggering 
side effects based on
+ * event reception.
  *
- * <p>Event listeners are executed synchronously after the main event 
processing is complete but
- * before the next event is processed. Implementations should be lightweight 
and avoid blocking
- * operations to prevent impacting agent performance.
+ * <p>Event listeners are executed synchronously when an event is received, 
before any actions are
+ * triggered. Implementations should be lightweight and avoid blocking 
operations to prevent
+ * impacting agent performance.
+ *
+ * <p><strong>Note:</strong> Implementing classes must provide a public 
no-argument constructor to
+ * allow for dynamic instantiation by the agent.
  */
 public interface EventListener {
     /**
-     * Called when an event has been processed.
+     * Called when an event is being processed.
      *
-     * <p>This method is invoked after the event has been processed by the 
agent's actions. The
-     * listener can inspect the event and its context to perform additional 
processing such as
-     * logging, metrics collection, or triggering external notifications.
+     * <p>This method is invoked when an event is received by the agent, 
before it is processed by
+     * any actions. The listener can inspect the event and its context to 
perform additional
+     * processing such as logging, metrics collection, or triggering external 
notifications.
      *
      * <p><strong>Important:</strong> This method should not throw exceptions 
as they will be caught
      * and logged but will not affect the main event processing flow. 
Implementations should handle
      * their own error recovery.
      *
-     * @param context The context associated with the event processing
-     * @param event The event that was processed
+     * @param context The context associated with the event
+     * @param event The event that is being processed
      */
     void onEventProcessed(EventContext context, Event event);
 }
diff --git a/docs/content/docs/operations/configuration.md 
b/docs/content/docs/operations/configuration.md
index 82e08f7a..ac9fab55 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -72,6 +72,9 @@ Configuration config = agentsEnv.getConfig();
 // Set custom configuration using key (direct string key)
 config.setInt("kafkaActionStateTopicNumPartitions", 128);  // Kafka topic 
partitions count
 
+// Set the list of event listeners
+config.set(AgentConfigOptions.EVENT_LISTENERS, 
List.of(MyCustomListener.class.getName()));
+
 // Set framework configuration using ConfigOption (predefined option class)
 config.set(AgentExecutionOptions.ERROR_HANDLING_STRATEGY, 
ErrorHandlingStrategy.RETRY);
 ```
@@ -129,6 +132,7 @@ Here is the list of all built-in core configuration options.
 | `eventLoggerType`         | `SLF4J`                    | LoggerType          
  | Which built-in event logger to use. Valid values: `SLF4J` (writes JSON 
through a dedicated SLF4J logger so events show up in Flink's Web UI **Logs** 
tab) and `FILE` (writes per-subtask `.log` files under `baseLogDir`). Setting 
`baseLogDir` overrides this and forces `FILE`. |
 | `baseLogDir`              | (none)                     | String              
  | Base directory for file-based event logs. If not set, uses 
`java.io.tmpdir/flink-agents`. Setting this value also implicitly switches 
`eventLoggerType` to `file`.                                                    
                                          |
 | `prettyPrint`             | false                      | boolean             
  | Whether to enable pretty-printed JSON format for event logs. When set to 
`true`, each event is written as formatted multi-line JSON instead of JSONL 
(JSON Lines) format. {{< hint info >}}Note: enabling this option makes the log 
file no longer valid JSONL format.  {{< /hint >}} |
+| `event-listeners`         | none                       | `List<String>`      
  | The list of event listener class names. Each class must implement the 
EventListener interface and provide a public no-argument constructor. {{< hint 
warning >}} Note: Currently, custom event listeners are only supported in Java. 
{{< /hint >}} |
 | `error-handling-strategy` | ErrorHandlingStrategy.FAIL | 
ErrorHandlingStrategy | Strategy for handling errors during model requests, 
include timeout and unexpected output schema. <br/>The option value could 
be:<br/> <ul><li>`ErrorHandlingStrategy.FAIL`</li> 
<li>`ErrorHandlingStrategy.RETRY`</li> <li>`ErrorHandlingStrategy.IGNORE`</li> |
 | `max-retries`             | 3                          | int                 
  | Number of retries when using `ErrorHandlingStrategy.RETRY`.                 
                                                                                
                                                                                
                    |
 | `retry-wait-interval`     | 1                          | int                 
  | Base wait interval in seconds between retries when using 
`ErrorHandlingStrategy.RETRY`. Uses exponential backoff: the actual wait time 
for the Nth retry is `retry-wait-interval * 2^(N-1)` seconds. For example, with 
default 1s, waits are 1s, 2s, 4s, etc. Retry count and total wait time are 
reported in `ChatResponseEvent` and recorded as metrics (`retryCount`, 
`retryWaitSec`) under the connection name. |
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 719db92d..957798f8 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -182,6 +182,9 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         // Initialize the event logger if it is set.
         eventRouter.initEventLogger(getRuntimeContext());
 
+        // Initialize user event listeners from configuration
+        eventRouter.initEventListeners(getRuntimeContext());
+
         // Since an operator restart may change the key range it manages due 
to changes in
         // parallelism,
         // and {@link tryProcessActionTaskForKey} mails might be lost,
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
index 1283c4c6..bc2aef6f 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
@@ -41,6 +41,8 @@ import 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
 
 import javax.annotation.Nullable;
 
@@ -48,6 +50,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.EVENT_LISTENERS;
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.EVENT_LOGGER_TYPE;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -83,6 +86,7 @@ class EventRouter<IN, OUT> implements AutoCloseable {
     private final boolean inputIsJava;
     private final EventLogger eventLogger;
     private final List<EventListener> eventListeners;
+    private final AgentPlan agentPlan;
     private StreamRecord<OUT> reusedStreamRecord;
     private SegmentedQueue keySegmentQueue;
     private BuiltInMetrics builtInMetrics;
@@ -93,6 +97,7 @@ class EventRouter<IN, OUT> implements AutoCloseable {
 
     @VisibleForTesting
     EventRouter(AgentPlan agentPlan, boolean inputIsJava, EventLogger 
eventLogger) {
+        this.agentPlan = agentPlan;
         this.inputIsJava = inputIsJava;
         this.eventLogger = eventLogger;
         this.eventListeners = new ArrayList<>();
@@ -127,6 +132,32 @@ class EventRouter<IN, OUT> implements AutoCloseable {
         }
     }
 
+    /**
+     * Initializes the {@link EventListener}s configured for this agent.
+     *
+     * @throws RuntimeException if any listener class fails to instantiate.
+     */
+    void initEventListeners(StreamingRuntimeContext runtimeContext) {
+        final List<String> eventListenerClassList = 
agentPlan.getConfig().get(EVENT_LISTENERS);
+        if (eventListenerClassList == null || 
eventListenerClassList.isEmpty()) {
+            return;
+        }
+
+        final ClassLoader userCodeClassLoader = 
runtimeContext.getUserCodeClassLoader();
+        final List<EventListener> eventListeners = new ArrayList<>();
+        for (String listenerClassName : eventListenerClassList) {
+            try {
+                eventListeners.add(
+                        InstantiationUtil.instantiate(
+                                listenerClassName, EventListener.class, 
userCodeClassLoader));
+            } catch (FlinkException e) {
+                throw new RuntimeException(
+                        "Failed to instantiate EventListener: " + 
listenerClassName, e);
+            }
+        }
+        this.eventListeners.addAll(eventListeners);
+    }
+
     /**
      * Wraps an incoming record into an {@link Event} suitable for action 
dispatch.
      *
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 2b2f5a29..41cb5109 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -19,12 +19,14 @@ package org.apache.flink.agents.runtime.operator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.EventContext;
 import org.apache.flink.agents.api.InputEvent;
 import org.apache.flink.agents.api.OutputEvent;
 import org.apache.flink.agents.api.configuration.AgentConfigOptions;
 import org.apache.flink.agents.api.context.DurableCallable;
 import org.apache.flink.agents.api.context.MemoryObject;
 import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.listener.EventListener;
 import org.apache.flink.agents.api.logger.EventLoggerConfig;
 import org.apache.flink.agents.api.logger.LoggerType;
 import org.apache.flink.agents.plan.AgentConfiguration;
@@ -308,6 +310,55 @@ public class ActionExecutionOperatorTest {
         }
     }
 
+    /** A EventListener for unit test */
+    public static class TestEventListener implements EventListener {
+        public boolean called = false;
+
+        @Override
+        public void onEventProcessed(EventContext context, Event event) {
+            this.called = true;
+        }
+    }
+
+    @Test
+    void testEventListenersFromAgentConfig() throws Exception {
+        final AgentConfiguration config = new AgentConfiguration();
+        config.set(AgentConfigOptions.EVENT_LISTENERS, 
List.of(TestEventListener.class.getName()));
+        final AgentPlan agentPlan = TestAgent.getAgentPlanWithConfig(config);
+
+        try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        new ActionExecutionOperatorFactory(agentPlan, true),
+                        (KeySelector<Long, Long>) value -> value,
+                        TypeInformation.of(Long.class))) {
+            testHarness.open();
+            final ActionExecutionOperator<Long, Object> operator =
+                    (ActionExecutionOperator<Long, Object>) 
testHarness.getOperator();
+            final Field eventListenersField = 
EventRouter.class.getDeclaredField("eventListeners");
+            eventListenersField.setAccessible(true);
+            final Object obj = 
eventListenersField.get(operator.getEventRouter());
+            assertThat(obj).isNotNull();
+            assertThat(obj).isInstanceOf(List.class);
+
+            final List eventListeners = (List) obj;
+            assertThat(eventListeners.size()).isEqualTo(1);
+
+            final Object listener = eventListeners.get(0);
+            assertThat(listener).isInstanceOf(TestEventListener.class);
+
+            // listener should not have been triggered yet
+            boolean called = ((TestEventListener) listener).called;
+            assertThat(called).isFalse();
+
+            // process a some element to trigger the operator logic
+            testHarness.processElement(new StreamRecord<>(1L));
+
+            // listener should have been invoked after element processing
+            called = ((TestEventListener) listener).called;
+            assertThat(called).isTrue();
+        }
+    }
+
     @Test
     void testDoesNotPruneBeforeCheckpointComplete() throws Exception {
         AgentPlan agentPlanWithStateStore = TestAgent.getAgentPlan(false);

Reply via email to