This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 61490ea5 [feature] Enable EventLog display in WebUI by default (#638)
61490ea5 is described below

commit 61490ea513ed81e4fde316dd1094e55b6bbc9838
Author: Eugene <[email protected]>
AuthorDate: Wed May 13 11:35:21 2026 +0800

    [feature] Enable EventLog display in WebUI by default (#638)
    
    * [hotfix] Add log4j when running the end-to-end test in Flink 1.20
    
    * [runtime] Consolidate event-log settings into single agent-config map
    
    * [feature] Enable EventLog display in WebUI by default
---
 .../api/configuration/AgentConfigOptions.java      |   9 +
 .../flink/agents/api/logger/EventLoggerConfig.java |  82 +++---
 .../agents/api/logger/EventLoggerFactory.java      | 104 +++----
 .../apache/flink/agents/api/logger/LoggerType.java |  71 +++++
 docs/content/docs/operations/configuration.md      |   3 +-
 docs/content/docs/operations/monitoring.md         |  22 +-
 .../pom.xml                                        |  22 ++
 python/flink_agents/api/core_options.py            |  18 ++
 .../agents/runtime/eventlog/FileEventLogger.java   |  38 +--
 .../agents/runtime/eventlog/Slf4jEventLogger.java  | 276 +++++++++++++++++++
 .../flink/agents/runtime/operator/EventRouter.java |  29 +-
 .../runtime/eventlog/FileEventLoggerTest.java      |  65 ++---
 .../runtime/eventlog/Slf4jEventLoggerTest.java     | 302 +++++++++++++++++++++
 .../operator/ActionExecutionOperatorTest.java      |  11 +-
 14 files changed, 862 insertions(+), 190 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 9cdda8bd..100894f7 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -18,10 +18,19 @@
 package org.apache.flink.agents.api.configuration;
 
 import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.apache.flink.agents.api.logger.LoggerType;
 
 /** The set of configuration options for agents parameters. */
 public class AgentConfigOptions {
 
+    /**
+     * The config parameter specifies which event logger implementation to 
use. Defaults to {@link
+     * LoggerType#SLF4J}, which surfaces events in Flink's Web UI; setting 
{@link LoggerType#FILE}
+     * (or configuring {@link #BASE_LOG_DIR}) routes events to per-subtask log 
files instead.
+     */
+    public static final ConfigOption<LoggerType> EVENT_LOGGER_TYPE =
+            new ConfigOption<>("eventLoggerType", LoggerType.class, 
LoggerType.SLF4J);
+
     /** The config parameter specifies the directory for the FileEvent file. */
     public static final ConfigOption<String> BASE_LOG_DIR =
             new ConfigOption<>("baseLogDir", String.class, null);
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java 
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
index 1dedeef0..870e8108 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
@@ -26,27 +26,42 @@ import java.util.Objects;
 /**
  * Unified configuration for event loggers with a fluent builder API.
  *
- * <p>This class provides a unified approach to configuring different types of 
event loggers using
- * string-based logger type identification and a flexible property map for 
implementation-specific
- * parameters.
+ * <p>A config selects an implementation via a {@link LoggerType} and carries 
a property map. The
+ * full agent configuration (e.g., {@code AgentConfiguration.getConfData()}) 
is passed through under
+ * {@link #AGENT_CONFIG_PROPERTY_KEY}; logger settings such as {@code 
baseLogDir}, {@code
+ * prettyPrint}, and the {@code event-log.*} keys live inside that map rather 
than as top-level
+ * properties.
  *
  * <h3>Usage Examples</h3>
  *
  * <pre>{@code
- * // Enable default file-based event logging with custom properties
+ * // Enable file-based event logging with custom log directory.
+ * Map<String, Object> agentConfig = new HashMap<>();
+ * agentConfig.put("baseLogDir", "/tmp/logs");
  * EventLoggerConfig fileConfig = EventLoggerConfig.builder()
- *     .loggerType("file")
- *     .property("baseLogDir", "/tmp/logs")
+ *     .loggerType(LoggerType.FILE)
+ *     .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY, agentConfig)
  *     .build();
  * }</pre>
  */
 public final class EventLoggerConfig {
 
-    private final String loggerType;
+    /**
+     * Property key used to pass the full agent config data map (e.g., {@code
+     * AgentConfiguration.getConfData()}) into a logger via {@link 
Builder#property(String,
+     * Object)}.
+     *
+     * <p>Built-in loggers read this property to initialize per-event-type log 
level resolution and
+     * STANDARD-level truncation. Custom loggers may use the same property to 
access agent-level
+     * configuration without taking a hard dependency on the runtime module.
+     */
+    public static final String AGENT_CONFIG_PROPERTY_KEY = "agentConfig";
+
+    private final LoggerType loggerType;
     private final Map<String, Object> properties;
 
     /** Private constructor - use {@link #builder()} to create instances. */
-    private EventLoggerConfig(String loggerType, Map<String, Object> 
properties) {
+    private EventLoggerConfig(LoggerType loggerType, Map<String, Object> 
properties) {
         this.loggerType = Objects.requireNonNull(loggerType, "Logger type 
cannot be null");
         this.properties = Collections.unmodifiableMap(new 
HashMap<>(properties));
     }
@@ -61,31 +76,22 @@ public final class EventLoggerConfig {
     }
 
     /**
-     * Gets the logger type identifier.
+     * Gets the logger type. Built-in types include {@link LoggerType#SLF4J} 
(default, outputs to
+     * Flink Web UI via log4j2) and {@link LoggerType#FILE} (writes to 
per-subtask log files).
      *
-     * <p>This string identifier is used to determine which EventLogger 
implementation should be
-     * instantiated. Built-in logger types include:
-     *
-     * <ul>
-     *   <li>"file" - File-based event logger (default)
-     * </ul>
-     *
-     * @return the logger type identifier (e.g., "file", "database", "kafka")
+     * @return the logger type, never null
      */
-    public String getLoggerType() {
+    public LoggerType getLoggerType() {
         return loggerType;
     }
 
     /**
-     * Gets the implementation-specific properties for this logger 
configuration.
-     *
-     * <p>These properties contain logger-specific configuration parameters 
that are not common
-     * across all logger implementations. For example:
+     * Gets the properties carried by this configuration.
      *
-     * <ul>
-     *   <li>File logger: "baseLogDir", "maxFileSize", "compression"
-     *   <li>Database logger: "connectionUrl", "tableName", "batchSize"
-     * </ul>
+     * <p>Currently the only well-known key is {@link 
#AGENT_CONFIG_PROPERTY_KEY}, which holds the
+     * full agent config map; logger-specific settings (e.g., {@code 
baseLogDir}, {@code
+     * prettyPrint}, {@code event-log.*}) are looked up inside that map by the 
logger implementation
+     * rather than from this top-level map.
      *
      * @return an immutable map of property names to values, never null
      */
@@ -109,13 +115,7 @@ public final class EventLoggerConfig {
 
     @Override
     public String toString() {
-        return "EventLoggerConfig{"
-                + "loggerType='"
-                + loggerType
-                + '\''
-                + ", properties="
-                + properties
-                + '}';
+        return "EventLoggerConfig{loggerType=" + loggerType + ", properties=" 
+ properties + '}';
     }
 
     /**
@@ -125,23 +125,23 @@ public final class EventLoggerConfig {
      * validation and sensible defaults.
      */
     public static final class Builder {
-        private String loggerType = "file"; // Default to file logger
+        private LoggerType loggerType = LoggerType.SLF4J;
         private final Map<String, Object> properties = new HashMap<>();
 
         private Builder() {}
 
         /**
-         * Sets the logger type identifier.
+         * Sets the logger type.
          *
-         * @param loggerType the logger type (e.g., "file", "database", 
"kafka")
+         * @param loggerType the built-in logger type
          * @return this Builder instance for method chaining
-         * @throws IllegalArgumentException if loggerType is null or empty
+         * @throws IllegalArgumentException if loggerType is null
          */
-        public Builder loggerType(String loggerType) {
-            if (loggerType == null || loggerType.trim().isEmpty()) {
-                throw new IllegalArgumentException("Logger type cannot be null 
or empty");
+        public Builder loggerType(LoggerType loggerType) {
+            if (loggerType == null) {
+                throw new IllegalArgumentException("Logger type cannot be 
null");
             }
-            this.loggerType = loggerType.trim();
+            this.loggerType = loggerType;
             return this;
         }
 
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerFactory.java 
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerFactory.java
index 48f23e57..7f7e12e0 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerFactory.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerFactory.java
@@ -24,16 +24,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
 /**
- * Factory for creating EventLogger instances based on logger type identifiers.
+ * Factory for creating EventLogger instances based on {@link LoggerType}.
  *
- * <p>This factory uses a string-based registry approach that allows both 
built-in and custom
- * EventLogger implementations to be created in a flexible manner. Built-in 
loggers are
- * automatically registered, while custom loggers can be registered using the 
{@link
- * #registerFactory(String, Function)} method.
- *
- * <p>Logger types are identified by simple string identifiers (e.g., "file", 
"database", "kafka"),
- * making the factory easy to use with configuration files and providing a 
clean abstraction for
- * different logger implementations.
+ * <p>Built-in loggers (see {@link LoggerType}) are auto-registered. 
Additional implementations for
+ * a given {@link LoggerType} can replace the built-in factory via {@link
+ * #registerFactory(LoggerType, Function)}.
  *
  * <h3>Thread Safety</h3>
  *
@@ -44,31 +39,21 @@ import java.util.function.Function;
  * <h3>Usage Examples</h3>
  *
  * <pre>{@code
- * // Create built-in file logger
  * EventLoggerConfig fileConfig = EventLoggerConfig.builder()
- *     .loggerType("file")
+ *     .loggerType(LoggerType.FILE)
  *     .property("baseLogDir", "/tmp/flink-agents")
  *     .build();
  * EventLogger fileLogger = EventLoggerFactory.createLogger(fileConfig);
- *
- * // Register and use custom logger
- * EventLoggerFactory.registerFactory("database",
- *     config -> new DatabaseEventLogger(config));
- *
- * EventLoggerConfig dbConfig = EventLoggerConfig.builder()
- *     .loggerType("database")
- *     .property("jdbcUrl", "jdbc:mysql://localhost/logs")
- *     .build();
- * EventLogger customLogger = EventLoggerFactory.createLogger(dbConfig);
  * }</pre>
  *
  * @see EventLogger
  * @see EventLoggerConfig
+ * @see LoggerType
  */
 public final class EventLoggerFactory {
 
-    /** Thread-safe registry of factory functions keyed by logger type 
identifier. */
-    private static final Map<String, Function<EventLoggerConfig, EventLogger>> 
FACTORIES =
+    /** Thread-safe registry of factory functions keyed by {@link LoggerType}. 
*/
+    private static final Map<LoggerType, Function<EventLoggerConfig, 
EventLogger>> FACTORIES =
             new ConcurrentHashMap<>();
 
     private EventLoggerFactory() {}
@@ -80,9 +65,6 @@ public final class EventLoggerFactory {
     /**
      * Creates an EventLogger instance based on the provided configuration.
      *
-     * <p>This method looks up the appropriate factory function for the logger 
type specified in the
-     * configuration and uses it to create the EventLogger instance.
-     *
      * @param config the EventLogger configuration
      * @return a new EventLogger instance configured according to the provided 
config
      * @throws IllegalArgumentException if config is null or no factory is 
registered for the logger
@@ -94,19 +76,13 @@ public final class EventLoggerFactory {
             throw new IllegalArgumentException("EventLoggerConfig cannot be 
null");
         }
 
-        String loggerType = config.getLoggerType();
-        if (loggerType == null || loggerType.trim().isEmpty()) {
-            throw new IllegalArgumentException("Logger type cannot be null or 
empty");
-        }
-
+        LoggerType loggerType = config.getLoggerType();
         Function<EventLoggerConfig, EventLogger> factory = 
FACTORIES.get(loggerType);
 
         if (factory == null) {
             throw new IllegalArgumentException(
                     String.format(
-                            "No factory registered for logger type: '%s'. "
-                                    + "Available types: %s. "
-                                    + "Use 
EventLoggerFactory.registerFactory() to register custom loggers.",
+                            "No factory registered for logger type: %s. 
Available types: %s.",
                             loggerType, FACTORIES.keySet()));
         }
 
@@ -119,67 +95,51 @@ public final class EventLoggerFactory {
     }
 
     /**
-     * Registers a factory function for a specific logger type.
-     *
-     * <p>This method allows custom EventLogger implementations to be 
registered with the factory.
-     * Once registered, the factory can create instances of the custom logger 
using the {@link
-     * #createLogger(EventLoggerConfig)} method.
+     * Registers a factory function for a specific {@link LoggerType}, 
replacing any previously
+     * registered factory for that type.
      *
-     * <p>If a factory is already registered for the given logger type, it 
will be replaced with the
-     * new factory function.
-     *
-     * @param loggerType the logger type identifier (e.g., "file", "database", 
"kafka")
+     * @param loggerType the logger type
      * @param factory the factory function that creates EventLogger instances
-     * @throws IllegalArgumentException if loggerType or factory is null or if 
loggerType is empty
+     * @throws IllegalArgumentException if loggerType or factory is null
      */
     public static void registerFactory(
-            String loggerType, Function<EventLoggerConfig, EventLogger> 
factory) {
-        if (loggerType == null || loggerType.trim().isEmpty()) {
-            throw new IllegalArgumentException("Logger type cannot be null or 
empty");
-        }
+            LoggerType loggerType, Function<EventLoggerConfig, EventLogger> 
factory) {
+        Objects.requireNonNull(loggerType, "Logger type cannot be null");
         Objects.requireNonNull(factory, "Factory function cannot be null");
-
-        FACTORIES.put(loggerType.trim(), factory);
+        FACTORIES.put(loggerType, factory);
     }
 
-    /**
-     * Registers built-in EventLogger factories.
-     *
-     * <p>This method is called during class initialization to register 
factories for all built-in
-     * EventLogger implementations.
-     */
+    /** Registers built-in EventLogger factories during class initialization. 
*/
     private static void registerBuiltInFactories() {
-        registerFileEventLoggerFactory();
+        registerByReflection(
+                LoggerType.FILE, 
"org.apache.flink.agents.runtime.eventlog.FileEventLogger");
+        registerByReflection(
+                LoggerType.SLF4J, 
"org.apache.flink.agents.runtime.eventlog.Slf4jEventLogger");
     }
 
     /**
-     * Registers the built-in file event logger factory.
-     *
-     * <p>This uses reflection to avoid hard dependencies on the runtime 
module, allowing the API
-     * module to be used independently.
+     * Registers a built-in logger factory backed by a runtime-module class 
loaded reflectively, so
+     * the api module does not need a compile-time dependency on the runtime 
module. Silently skips
+     * registration when the runtime class is not on the classpath.
      */
-    private static void registerFileEventLoggerFactory() {
+    private static void registerByReflection(LoggerType loggerType, String 
className) {
         try {
-            // Try to load FileEventLogger class
-            Class<?> fileLoggerClass =
-                    
Class.forName("org.apache.flink.agents.runtime.eventlog.FileEventLogger");
-
+            Class<?> loggerClass = Class.forName(className);
             registerFactory(
-                    "file",
+                    loggerType,
                     config -> {
                         try {
-                            // FileEventLogger now takes unified 
EventLoggerConfig directly
                             return (EventLogger)
-                                    fileLoggerClass
+                                    loggerClass
                                             
.getConstructor(EventLoggerConfig.class)
                                             .newInstance(config);
                         } catch (Exception e) {
-                            throw new RuntimeException("Failed to create 
FileEventLogger", e);
+                            throw new RuntimeException(
+                                    "Failed to create " + loggerType + " event 
logger", e);
                         }
                     });
         } catch (ClassNotFoundException e) {
-            // FileEventLogger not found, skip registration
-            // This is expected if the runtime module is not on the classpath
+            // The runtime module is not on the classpath; the built-in logger 
is unavailable.
         }
     }
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/logger/LoggerType.java 
b/api/src/main/java/org/apache/flink/agents/api/logger/LoggerType.java
new file mode 100644
index 00000000..4668ec89
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/logger/LoggerType.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.logger;
+
+/**
+ * Enumeration of built-in EventLogger types.
+ *
+ * <p>Each {@code LoggerType} corresponds to a built-in {@link EventLogger} 
implementation
+ * registered with the {@link EventLoggerFactory}. The {@link #getType()} 
string is the identifier
+ * used by the factory registry, the configuration system, and {@link
+ * EventLoggerConfig#getLoggerType()}.
+ */
+public enum LoggerType {
+    /** SLF4J-based event logger; outputs to Flink Web UI via log4j2. */
+    SLF4J("slf4j"),
+
+    /** File-based event logger; writes events to per-subtask log files. */
+    FILE("file");
+
+    private final String type;
+
+    LoggerType(String type) {
+        this.type = type;
+    }
+
+    /**
+     * Gets the string identifier used to register and look up this logger 
type in {@link
+     * EventLoggerFactory}.
+     *
+     * @return the logger type identifier
+     */
+    public String getType() {
+        return type;
+    }
+
+    /**
+     * Resolves a {@link LoggerType} from its string identifier.
+     *
+     * @param type the logger type identifier (e.g., "slf4j", "file")
+     * @return the matching {@link LoggerType}
+     * @throws IllegalArgumentException if no matching type exists
+     */
+    public static LoggerType fromType(String type) {
+        if (type == null) {
+            throw new IllegalArgumentException("Logger type cannot be null");
+        }
+        String trimmed = type.trim();
+        for (LoggerType value : values()) {
+            if (value.type.equalsIgnoreCase(trimmed)) {
+                return value;
+            }
+        }
+        throw new IllegalArgumentException("Unknown logger type: " + type);
+    }
+}
diff --git a/docs/content/docs/operations/configuration.md 
b/docs/content/docs/operations/configuration.md
index 9b325837..8a387def 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -126,7 +126,8 @@ Here is the list of all built-in core configuration options.
 
 | Key                       | Default                    | Type                
  | Description                                                                 
                                                                                
                                                                                
                    |
 
|---------------------------|----------------------------|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `baseLogDir`              | (none)                     | String              
  | Base directory for file-based event logs. If not set, uses 
`java.io.tmpdir/flink-agents`.                                                  
                                                                                
                                     |
+| `eventLoggerType`         | `SLF4J`                    | LoggerType          
  | Which built-in event logger to use. Valid values: `SLF4J` (writes JSON 
through a dedicated SLF4J logger so events show up in Flink's Web UI **Logs** 
tab) and `FILE` (writes per-subtask `.log` files under `baseLogDir`). Setting 
`baseLogDir` overrides this and forces `FILE`. |
+| `baseLogDir`              | (none)                     | String              
  | Base directory for file-based event logs. If not set, uses 
`java.io.tmpdir/flink-agents`. Setting this value also implicitly switches 
`eventLoggerType` to `file`.                                                    
                                          |
 | `prettyPrint`             | false                      | boolean             
  | Whether to enable pretty-printed JSON format for event logs. When set to 
`true`, each event is written as formatted multi-line JSON instead of JSONL 
(JSON Lines) format. {{< hint info >}}Note: enabling this option makes the log 
file no longer valid JSONL format.  {{< /hint >}} |
 | `error-handling-strategy` | ErrorHandlingStrategy.FAIL | 
ErrorHandlingStrategy | Strategy for handling errors during model requests, 
include timeout and unexpected output schema. <br/>The option value could 
be:<br/> <ul><li>`ErrorHandlingStrategy.FAIL`</li> 
<li>`ErrorHandlingStrategy.RETRY`</li> <li>`ErrorHandlingStrategy.IGNORE`</li> |
 | `max-retries`             | 3                          | int                 
  | Number of retries when using `ErrorHandlingStrategy.RETRY`.                 
                                                                                
                                                                                
                    |
diff --git a/docs/content/docs/operations/monitoring.md 
b/docs/content/docs/operations/monitoring.md
index 12c94840..1925c7f6 100644
--- a/docs/content/docs/operations/monitoring.md
+++ b/docs/content/docs/operations/monitoring.md
@@ -143,11 +143,19 @@ We can check the log result in the WebUI of Flink Job:
 
 ## Event Log
 
-Currently, the system supports **File-based Event Log** as the default 
implementation. Future releases will introduce support for additional types of 
event logs and provide configuration options to let users choose their 
preferred logging mechanism.
+The system supports two types of event loggers: **SLF4J Event Log** (default) 
and **File Event Log**.
+
+By default, the SLF4J Event Log is used. If `baseLogDir` is configured, the 
system automatically switches to the File Event Log.
+
+### SLF4J Event Log (Default)
+
+The **SLF4J Event Log** outputs events through a dedicated SLF4J logger 
(`org.apache.flink.agents.EventLog`). On startup, the logger **automatically 
configures** log4j2 to write events to a separate file 
(`{log.file}.event-log.log`) in Flink's log directory, making them visible in 
Flink's Web UI **Logs** tab. **No manual log4j2 configuration is required.**
+
+Because all subtasks on a TaskManager share the same log destination, each 
record additionally carries `jobId`, `taskName`, and `subtaskId` top-level 
fields so consumers can still distinguish events from different subtasks. The 
rest of the record follows the common [JSON Format](#json-format) described 
below.
 
 ### File Event Log
 
-The **File Event Log** is a file-based event logging system that stores events 
in structured files within a flat directory. 
+The **File Event Log** is a file-based event logging system that stores events 
in structured files within a flat directory. To use it, configure `baseLogDir` 
in your Flink `config.yaml`.
 
 By default, each event is recorded in **JSON Lines (JSONL)** format, with one 
JSON object per line. When [`prettyPrint`]({{< ref 
"docs/operations/configuration#core-options" >}}) is enabled, each event is 
written as formatted multi-line JSON instead, and the log file is no longer in 
valid JSONL format.
 
@@ -164,7 +172,9 @@ The log files follow a naming convention consistent with 
Flink's logging standar
 
 By default, all File-based Event Logs are stored in the `flink-agents` 
subdirectory under the system temporary directory (`java.io.tmpdir`). You can 
override the base log directory with the `agent.baseLogDir` setting in Flink 
`config.yaml`.
 
-#### JSON Format
+### JSON Format
+
+The JSON record format described here applies to both the SLF4J and File event 
loggers. The SLF4J logger adds `jobId`, `taskName`, and `subtaskId` fields on 
top (see [SLF4J Event Log](#slf4j-event-log-default)); the File logger encodes 
those values in the file path instead.
 
 Each record contains a top-level `timestamp`, the resolved `logLevel`, and a 
top-level `eventType` routing key (mirrors `event.eventType`), followed by the 
full event object. The top-level `eventType` makes it easy for downstream tools 
(e.g. `grep`, `jq`, log shippers) to filter by event type without parsing 
nested JSON:
 
@@ -180,7 +190,7 @@ Each record contains a top-level `timestamp`, the resolved 
`logLevel`, and a top
 }
 ```
 
-#### Event Log Levels
+### Event Log Levels
 
 Each event type is logged at a configurable verbosity. Three levels are 
supported:
 
@@ -226,7 +236,7 @@ Example record at `STANDARD` with a long string and a large 
array truncated:
 }
 ```
 
-#### Per-event-type log levels
+### Per-event-type log levels
 
 You can override the level for individual event types using the 
`event-log.type.<EVENT_TYPE>.level` config key, where `<EVENT_TYPE>` is the 
event's routing type string (the same string that appears as `eventType` in the 
JSON log). Built-in events use short snake-cased names such as:
 
@@ -276,7 +286,7 @@ flink run ... \
 
 Other per-type levels from `config.yaml` are preserved — the `-D` flag only 
overrides the one key it names.
 
-#### Compatibility Notes
+### Compatibility Notes
 
 - **Default behavior changed.** Before this feature, every event was logged in 
full. The new default is `STANDARD`, which truncates large payloads. To restore 
the previous behavior either globally or per type, set the level to `VERBOSE`.
 - **Old log records still parse.** Records written before this feature have no 
`logLevel` or top-level `eventType`. They deserialize correctly and are treated 
as `VERBOSE` (their payloads were never truncated).
diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml 
b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
index 06db5577..44d76579 100644
--- a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
@@ -139,6 +139,28 @@ under the License.
                 <flink.version>${flink.1.20.version}</flink.version>
                 
<flink.agents.dist.artifactId>flink-agents-dist-flink-1.20</flink.agents.dist.artifactId>
             </properties>
+            <dependencies>
+                <!-- Flink 1.20 does not transitively provide log4j-core in 
test classpath,
+                     needed for Slf4jEventLogger which uses log4j2 core APIs 
-->
+                <dependency>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                    <version>${log4j2.version}</version>
+                    <scope>test</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                    <version>${log4j2.version}</version>
+                    <scope>test</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                    <version>${log4j2.version}</version>
+                    <scope>test</scope>
+                </dependency>
+            </dependencies>
         </profile>
 
         <!-- Flink 2.0 Profile -->
diff --git a/python/flink_agents/api/core_options.py 
b/python/flink_agents/api/core_options.py
index 98b8b317..5b575c3f 100644
--- a/python/flink_agents/api/core_options.py
+++ b/python/flink_agents/api/core_options.py
@@ -82,6 +82,18 @@ class ErrorHandlingStrategy(Enum):
     IGNORE = "ignore"
 
 
+class LoggerType(Enum):
+    """Built-in event logger types.
+
+    Mirrors the Java ``LoggerType`` enum so Python users can configure the
+    logger type via ``AgentConfigOptions.EVENT_LOGGER_TYPE`` without using
+    raw strings.
+    """
+
+    SLF4J = "slf4j"
+    FILE = "file"
+
+
 class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
     """CoreOptions to manage core configuration parameters for Flink Agents."""
 
@@ -91,6 +103,12 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
         default=None,
     )
 
+    EVENT_LOGGER_TYPE = ConfigOption(
+        key="eventLoggerType",
+        config_type=LoggerType,
+        default=LoggerType.SLF4J,
+    )
+
     # Event log level config options
     EVENT_LOG_LEVEL = ConfigOption(
         key="event-log.level",
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
index 7b42eb7a..9cedb632 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
@@ -81,15 +81,10 @@ import java.util.Map;
  * </pre>
  */
 public class FileEventLogger implements EventLogger {
-    public static final String BASE_LOG_DIR_PROPERTY_KEY = "baseLogDir";
-    public static final String PRETTY_PRINT_PROPERTY_KEY = "prettyPrint";
     // The default base log directory if not specified in the configuration
     private static final String DEFAULT_BASE_LOG_DIR =
             Paths.get(System.getProperty("java.io.tmpdir"), 
"flink-agents").toString();
 
-    /** Property key for passing the full agent config data map into the 
logger. */
-    public static final String AGENT_CONFIG_PROPERTY_KEY = "agentConfig";
-
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
     private final EventLoggerConfig config;
@@ -105,7 +100,20 @@ public class FileEventLogger implements EventLogger {
 
     @Override
     public void open(EventLoggerOpenParams params) throws Exception {
-        String logFilePath = generateSubTaskLogFilePath(params);
+        // The full agent config is the single source of truth for all logger 
settings.
+        @SuppressWarnings("unchecked")
+        Map<String, Object> agentConfig =
+                (Map<String, Object>)
+                        config.getProperties()
+                                .getOrDefault(
+                                        
EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
+                                        Collections.emptyMap());
+
+        String baseLogDir =
+                (String)
+                        agentConfig.getOrDefault(
+                                AgentConfigOptions.BASE_LOG_DIR.getKey(), 
DEFAULT_BASE_LOG_DIR);
+        String logFilePath = generateSubTaskLogFilePath(params, baseLogDir);
         // Create base directory if it doesn't exist
         Path logPath = Paths.get(logFilePath).getParent();
         if (!Files.exists(logPath)) {
@@ -114,14 +122,11 @@ public class FileEventLogger implements EventLogger {
         // Create writer in append mode
         writer = new PrintWriter(new BufferedWriter(new 
FileWriter(logFilePath, true)));
         prettyPrint =
-                (Boolean) 
config.getProperties().getOrDefault(PRETTY_PRINT_PROPERTY_KEY, false);
+                (Boolean)
+                        agentConfig.getOrDefault(
+                                AgentConfigOptions.PRETTY_PRINT.getKey(),
+                                
AgentConfigOptions.PRETTY_PRINT.getDefaultValue());
 
-        // Initialize level resolver and truncator from agent config
-        @SuppressWarnings("unchecked")
-        Map<String, Object> agentConfig =
-                (Map<String, Object>)
-                        config.getProperties()
-                                .getOrDefault(AGENT_CONFIG_PROPERTY_KEY, 
Collections.emptyMap());
         this.levelResolver = new EventLogLevelResolver(agentConfig);
         int maxStringLength =
                 getIntFromConfig(
@@ -156,12 +161,7 @@ public class FileEventLogger implements EventLogger {
         }
     }
 
-    private String generateSubTaskLogFilePath(EventLoggerOpenParams params) {
-        // Get base log directory from properties
-        String baseLogDir =
-                (String)
-                        config.getProperties()
-                                .getOrDefault(BASE_LOG_DIR_PROPERTY_KEY, 
DEFAULT_BASE_LOG_DIR);
+    private String generateSubTaskLogFilePath(EventLoggerOpenParams params, 
String baseLogDir) {
         String jobId = 
params.getRuntimeContext().getJobInfo().getJobId().toString();
         String taskName =
                 params.getRuntimeContext()
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLogger.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLogger.java
new file mode 100644
index 00000000..d5f00f87
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLogger.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.EventContext;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.apache.flink.agents.api.logger.EventLogger;
+import org.apache.flink.agents.api.logger.EventLoggerConfig;
+import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.metrics.Counter;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.FileAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * An SLF4J-based event logger that outputs events through a dedicated SLF4J 
logger.
+ *
+ * <p>This logger writes event log records as JSON to a dedicated SLF4J logger 
named {@value
+ * #EVENT_LOGGER_NAME}. Events are automatically routed to a separate file in 
Flink's log directory,
+ * making them visible in Flink's Web UI "Logs" tab.
+ *
+ * <p>On {@link #open}, the logger automatically configures log4j2 to write 
event logs to a separate
+ * file (derived from Flink's {@code log.file} system property). No manual 
log4j2 configuration is
+ * required.
+ *
+ * <p>Unlike {@link FileEventLogger}, which creates a separate log file per 
subtask, this logger
+ * writes all events from a TaskManager to the same log destination. To 
distinguish events from
+ * different subtasks, each JSON record includes {@code jobId}, {@code 
taskName}, and {@code
+ * subtaskId} fields.
+ *
+ * <p>This logger honors the per-event-type log level configuration resolved 
by {@link
+ * EventLogLevelResolver}; events resolved to {@link EventLogLevel#OFF} are 
skipped, and events at
+ * {@link EventLogLevel#STANDARD} have their payloads truncated by a {@link 
JsonTruncator}.
+ *
+ * <h3>Thread Safety</h3>
+ *
+ * <p>This class is <strong>thread-safe at the Flink subtask level</strong>, 
following the same
+ * guarantees as {@link FileEventLogger}. Each subtask instance gets its own 
logger instance with
+ * its own subtask context fields.
+ */
+public class Slf4jEventLogger implements EventLogger {
+    /** Dedicated logger name for event log output. */
+    public static final String EVENT_LOGGER_NAME = 
"org.apache.flink.agents.EventLog";
+
+    private static final String EVENT_LOG_APPENDER_NAME = 
"FlinkAgentsEventLogAppender";
+
+    private static final Logger EVENT_LOG = 
LoggerFactory.getLogger(EVENT_LOGGER_NAME);
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    private final EventLoggerConfig config;
+    private boolean prettyPrint;
+    private String jobId;
+    private String taskName;
+    private int subtaskId;
+    private EventLogLevelResolver levelResolver;
+    private JsonTruncator truncator;
+    private Counter truncatedEventsCounter;
+
+    public Slf4jEventLogger(EventLoggerConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void open(EventLoggerOpenParams params) throws Exception {
+        jobId = params.getRuntimeContext().getJobInfo().getJobId().toString();
+        taskName = params.getRuntimeContext().getTaskInfo().getTaskName();
+        subtaskId = 
params.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+
+        // The full agent config is the single source of truth for all logger 
settings
+        // (mirrors FileEventLogger).
+        @SuppressWarnings("unchecked")
+        Map<String, Object> agentConfig =
+                (Map<String, Object>)
+                        config.getProperties()
+                                .getOrDefault(
+                                        
EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
+                                        Collections.emptyMap());
+        prettyPrint =
+                (Boolean)
+                        agentConfig.getOrDefault(
+                                AgentConfigOptions.PRETTY_PRINT.getKey(),
+                                
AgentConfigOptions.PRETTY_PRINT.getDefaultValue());
+        this.levelResolver = new EventLogLevelResolver(agentConfig);
+        int maxStringLength =
+                getIntFromConfig(
+                        agentConfig,
+                        
AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getKey(),
+                        
AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getDefaultValue());
+        int maxArrayElements =
+                getIntFromConfig(
+                        agentConfig,
+                        
AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getKey(),
+                        
AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getDefaultValue());
+        int maxDepth =
+                getIntFromConfig(
+                        agentConfig,
+                        AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getKey(),
+                        
AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getDefaultValue());
+        this.truncator = new JsonTruncator(maxStringLength, maxArrayElements, 
maxDepth);
+
+        ensureLog4j2AppenderConfigured();
+    }
+
+    private static int getIntFromConfig(Map<String, Object> config, String 
key, int defaultValue) {
+        Object value = config.get(key);
+        if (value == null) {
+            return defaultValue;
+        }
+        if (value instanceof Number) {
+            return ((Number) value).intValue();
+        }
+        try {
+            return Integer.parseInt(value.toString());
+        } catch (NumberFormatException e) {
+            return defaultValue;
+        }
+    }
+
+    @Override
+    public void append(EventContext context, Event event) throws Exception {
+        // Resolve log level and skip OFF events.
+        EventLogLevel level =
+                levelResolver != null
+                        ? levelResolver.resolve(event.getType())
+                        : EventLogLevel.VERBOSE;
+        if (level == EventLogLevel.OFF) {
+            return;
+        }
+
+        EventLogRecord record = new EventLogRecord(context, event);
+        JsonNode tree = MAPPER.valueToTree(record);
+        if (!(tree instanceof ObjectNode)) {
+            throw new IllegalStateException(
+                    "EventLogRecord must serialize to a JSON object, but was: "
+                            + tree.getNodeType());
+        }
+        ObjectNode rootNode = (ObjectNode) tree;
+
+        // Truncate the event subtree at STANDARD level.
+        if (level == EventLogLevel.STANDARD && truncator != null) {
+            JsonNode eventNode = rootNode.get("event");
+            if (eventNode instanceof ObjectNode) {
+                boolean truncated = truncator.truncate((ObjectNode) eventNode);
+                if (truncated && truncatedEventsCounter != null) {
+                    truncatedEventsCounter.inc();
+                }
+            }
+        }
+
+        // Rebuild the top-level object so logLevel/subtask context sit 
alongside the documented
+        // JSON layout: timestamp, logLevel, eventType, subtask context, event.
+        ObjectNode ordered = MAPPER.createObjectNode();
+        ordered.set("timestamp", rootNode.get("timestamp"));
+        ordered.put("logLevel", level.name());
+        ordered.set("eventType", rootNode.get("eventType"));
+        ordered.put("jobId", jobId);
+        ordered.put("taskName", taskName);
+        ordered.put("subtaskId", subtaskId);
+        ordered.set("event", rootNode.get("event"));
+
+        String json =
+                prettyPrint
+                        ? 
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(ordered)
+                        : MAPPER.writeValueAsString(ordered);
+        EVENT_LOG.info(json);
+    }
+
+    @Override
+    public void flush() throws Exception {
+        // No-op: SLF4J/log4j2 handles flushing
+    }
+
+    /**
+     * Sets the counter for tracking truncated events. Called by the operator 
after metrics are
+     * initialized.
+     *
+     * @param counter the counter to increment when events are truncated
+     */
+    public void setTruncatedEventsCounter(Counter counter) {
+        this.truncatedEventsCounter = counter;
+    }
+
+    @Override
+    public void close() throws Exception {
+        // No-op: SLF4J/log4j2 manages logger lifecycle
+    }
+
+    /**
+     * Configures the log4j2 event log appender programmatically.
+     *
+     * <p>This method creates a dedicated file appender that writes to {@code
+     * {log.file}.event-log.log} in the same directory as Flink's main log 
file. If the appender has
+     * already been configured (e.g., by a previous subtask on the same 
TaskManager), this method is
+     * a no-op.
+     */
+    private static synchronized void ensureLog4j2AppenderConfigured() {
+        try {
+            LoggerContext loggerContext = (LoggerContext) 
LogManager.getContext(false);
+            Configuration configuration = loggerContext.getConfiguration();
+            LoggerConfig loggerConfig = 
configuration.getLoggerConfig(EVENT_LOGGER_NAME);
+
+            // If the appender has already been configured, skip.
+            if (loggerConfig.getName().equals(EVENT_LOGGER_NAME)) {
+                return;
+            }
+
+            // Derive event log file path from Flink's log.file system property
+            String logFile = System.getProperty("log.file");
+            if (logFile == null || logFile.isEmpty()) {
+                // Not running in a Flink environment with log.file set,
+                // fall back to root logger (events will go to main log)
+                return;
+            }
+
+            String eventLogFile = logFile + ".event-log.log";
+
+            // Create a file appender with %msg%n pattern (JSON only, no log 
metadata)
+            PatternLayout layout = 
PatternLayout.newBuilder().withPattern("%msg%n").build();
+
+            FileAppender appender =
+                    FileAppender.newBuilder()
+                            .setName(EVENT_LOG_APPENDER_NAME)
+                            .withFileName(eventLogFile)
+                            .withAppend(true)
+                            .setLayout(layout)
+                            .build();
+            appender.start();
+            configuration.addAppender(appender);
+
+            // Create a dedicated logger config with additivity=false
+            LoggerConfig eventLoggerConfig = new 
LoggerConfig(EVENT_LOGGER_NAME, Level.INFO, false);
+            eventLoggerConfig.addAppender(appender, Level.INFO, null);
+            configuration.addLogger(EVENT_LOGGER_NAME, eventLoggerConfig);
+
+            loggerContext.updateLoggers();
+        } catch (Exception e) {
+            // If programmatic configuration fails (e.g., not using log4j2),
+            // fall back silently — events will go to the root logger.
+            LoggerFactory.getLogger(Slf4jEventLogger.class)
+                    .warn(
+                            "Failed to auto-configure event log appender, "
+                                    + "events will be logged to the root 
logger.",
+                            e);
+        }
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
index a79f227d..096ac20d 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
@@ -27,9 +27,11 @@ import org.apache.flink.agents.api.logger.EventLogger;
 import org.apache.flink.agents.api.logger.EventLoggerConfig;
 import org.apache.flink.agents.api.logger.EventLoggerFactory;
 import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.logger.LoggerType;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.eventlog.FileEventLogger;
+import org.apache.flink.agents.runtime.eventlog.Slf4jEventLogger;
 import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
 import org.apache.flink.agents.runtime.operator.queue.SegmentedQueue;
 import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
@@ -46,7 +48,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR;
-import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.PRETTY_PRINT;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.EVENT_LOGGER_TYPE;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
@@ -114,6 +116,9 @@ class EventRouter<IN, OUT> implements AutoCloseable {
         if (eventLogger instanceof FileEventLogger) {
             ((FileEventLogger) eventLogger)
                     
.setTruncatedEventsCounter(builtInMetrics.getEventLogTruncatedEventsCounter());
+        } else if (eventLogger instanceof Slf4jEventLogger) {
+            ((Slf4jEventLogger) eventLogger)
+                    
.setTruncatedEventsCounter(builtInMetrics.getEventLogTruncatedEventsCounter());
         }
     }
 
@@ -238,16 +243,24 @@ class EventRouter<IN, OUT> implements AutoCloseable {
     }
 
     private EventLogger createEventLogger(AgentPlan agentPlan) {
-        EventLoggerConfig.Builder loggerConfigBuilder = 
EventLoggerConfig.builder();
+        // Honor the EVENT_LOGGER_TYPE config, defaulting to SLF4J so events 
surface in the Flink
+        // Web UI by default. An explicit baseLogDir forces the file logger 
for backward
+        // compatibility with the existing file-based logging path.
+        LoggerType loggerType = agentPlan.getConfig().get(EVENT_LOGGER_TYPE);
         String baseLogDir = agentPlan.getConfig().get(BASE_LOG_DIR);
         if (baseLogDir != null && !baseLogDir.trim().isEmpty()) {
-            
loggerConfigBuilder.property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
baseLogDir);
+            loggerType = LoggerType.FILE;
         }
-        loggerConfigBuilder.property(
-                FileEventLogger.PRETTY_PRINT_PROPERTY_KEY, 
agentPlan.getConfig().get(PRETTY_PRINT));
-        loggerConfigBuilder.property(
-                FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentPlan.getConfig().getConfData());
-        return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
+        // The full agent config is the single source of truth for logger 
settings (baseLogDir,
+        // prettyPrint, event-log levels, truncation limits). Each logger 
pulls what it needs.
+        EventLoggerConfig config =
+                EventLoggerConfig.builder()
+                        .loggerType(loggerType)
+                        .property(
+                                EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
+                                agentPlan.getConfig().getConfData())
+                        .build();
+        return EventLoggerFactory.createLogger(config);
     }
 
     @Override
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
index 831fba4f..9156aee7 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.agents.api.OutputEvent;
 import org.apache.flink.agents.api.configuration.AgentConfigOptions;
 import org.apache.flink.agents.api.logger.EventLoggerConfig;
 import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.logger.LoggerType;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobInfo;
 import org.apache.flink.api.common.TaskInfo;
@@ -81,15 +82,25 @@ class FileEventLoggerTest {
         when(taskInfo.getIndexOfThisSubtask()).thenReturn(testSubTaskId);
 
         // Create config and logger
-        config =
-                EventLoggerConfig.builder()
-                        .loggerType("file")
-                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
-                        .build();
+        config = buildConfig(new HashMap<>());
         logger = new FileEventLogger(config);
         openParams = new EventLoggerOpenParams(runtimeContext);
     }
 
+    /**
+     * Builds an EventLoggerConfig for the file logger, seeding the 
agent-config map with {@code
+     * baseLogDir} so tests can drop their per-test boilerplate and only 
specify the keys they
+     * actually care about.
+     */
+    private EventLoggerConfig buildConfig(Map<String, Object> 
extraAgentConfig) {
+        Map<String, Object> agentConfig = new HashMap<>(extraAgentConfig);
+        agentConfig.putIfAbsent(AgentConfigOptions.BASE_LOG_DIR.getKey(), 
tempDir.toString());
+        return EventLoggerConfig.builder()
+                .loggerType(LoggerType.FILE)
+                .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
+                .build();
+    }
+
     @AfterEach
     void tearDown() throws Exception {
         if (logger != null) {
@@ -302,12 +313,9 @@ class FileEventLoggerTest {
     @Test
     void testPrettyPrintOutputsFormattedJson() throws Exception {
         // Given - config with prettyPrint enabled
-        config =
-                EventLoggerConfig.builder()
-                        .loggerType("file")
-                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
-                        .property(AgentConfigOptions.PRETTY_PRINT.getKey(), 
true)
-                        .build();
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put(AgentConfigOptions.PRETTY_PRINT.getKey(), true);
+        config = buildConfig(agentConfig);
         logger = new FileEventLogger(config);
 
         logger.open(openParams);
@@ -340,12 +348,7 @@ class FileEventLoggerTest {
         agentConfig.put("event-log.standard.max-array-elements", 20);
         agentConfig.put("event-log.standard.max-depth", 5);
 
-        config =
-                EventLoggerConfig.builder()
-                        .loggerType("file")
-                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
-                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
-                        .build();
+        config = buildConfig(agentConfig);
         logger = new FileEventLogger(config);
         logger.open(openParams);
 
@@ -380,12 +383,7 @@ class FileEventLoggerTest {
         agentConfig.put("event-log.level", "VERBOSE");
         agentConfig.put("event-log.standard.max-string-length", 10);
 
-        config =
-                EventLoggerConfig.builder()
-                        .loggerType("file")
-                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
-                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
-                        .build();
+        config = buildConfig(agentConfig);
         logger = new FileEventLogger(config);
         logger.open(openParams);
 
@@ -418,12 +416,7 @@ class FileEventLoggerTest {
         Map<String, Object> agentConfig = new HashMap<>();
         agentConfig.put("event-log.level", "OFF");
 
-        config =
-                EventLoggerConfig.builder()
-                        .loggerType("file")
-                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
-                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
-                        .build();
+        config = buildConfig(agentConfig);
         logger = new FileEventLogger(config);
         logger.open(openParams);
 
@@ -446,12 +439,7 @@ class FileEventLoggerTest {
         agentConfig.put("event-log.standard.max-string-length", 10);
         agentConfig.put("event-log.type." + InputEvent.EVENT_TYPE + ".level", 
"VERBOSE");
 
-        config =
-                EventLoggerConfig.builder()
-                        .loggerType("file")
-                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
-                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
-                        .build();
+        config = buildConfig(agentConfig);
         logger = new FileEventLogger(config);
         logger.open(openParams);
 
@@ -533,12 +521,7 @@ class FileEventLoggerTest {
         agentConfig.put("event-log.type.com.example.events.level", "OFF");
         agentConfig.put("event-log.type." + TestNamespacedEventA.EVENT_TYPE + 
".level", "VERBOSE");
 
-        config =
-                EventLoggerConfig.builder()
-                        .loggerType("file")
-                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
-                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
-                        .build();
+        config = buildConfig(agentConfig);
         logger = new FileEventLogger(config);
         logger.open(openParams);
 
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLoggerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLoggerTest.java
new file mode 100644
index 00000000..d3dc51f5
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLoggerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.EventContext;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLoggerConfig;
+import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.logger.LoggerType;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.when;
+
+class Slf4jEventLoggerTest {
+
+    @Mock private StreamingRuntimeContext runtimeContext;
+
+    @Mock private JobInfo jobInfo;
+
+    @Mock private TaskInfo taskInfo;
+
+    private Slf4jEventLogger logger;
+    private EventLoggerOpenParams openParams;
+    private ObjectMapper objectMapper;
+    private TestAppender testAppender;
+
+    private final JobID testJobId = JobID.generate();
+    private final String testTaskName = "action-execute-operator";
+    private final int testSubTaskId = 0;
+
+    @BeforeEach
+    void setUp() {
+        MockitoAnnotations.openMocks(this);
+        objectMapper = new ObjectMapper();
+
+        // Configure mocks
+        when(runtimeContext.getJobInfo()).thenReturn(jobInfo);
+        when(runtimeContext.getTaskInfo()).thenReturn(taskInfo);
+        when(jobInfo.getJobId()).thenReturn(testJobId);
+        when(taskInfo.getTaskName()).thenReturn(testTaskName);
+        when(taskInfo.getIndexOfThisSubtask()).thenReturn(testSubTaskId);
+
+        openParams = new EventLoggerOpenParams(runtimeContext);
+
+        // Set up log4j2 test appender to capture event log output
+        testAppender = new TestAppender("TestSlf4jAppender");
+        testAppender.start();
+
+        LoggerContext loggerContext = (LoggerContext) 
LogManager.getContext(false);
+        Configuration config = loggerContext.getConfiguration();
+        config.addAppender(testAppender);
+
+        LoggerConfig loggerConfig = 
config.getLoggerConfig(Slf4jEventLogger.EVENT_LOGGER_NAME);
+        if 
(!loggerConfig.getName().equals(Slf4jEventLogger.EVENT_LOGGER_NAME)) {
+            loggerConfig =
+                    new LoggerConfig(
+                            Slf4jEventLogger.EVENT_LOGGER_NAME,
+                            org.apache.logging.log4j.Level.INFO,
+                            false);
+            config.addLogger(Slf4jEventLogger.EVENT_LOGGER_NAME, loggerConfig);
+        }
+        loggerConfig.addAppender(testAppender, 
org.apache.logging.log4j.Level.INFO, null);
+        loggerContext.updateLoggers();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        if (logger != null) {
+            logger.close();
+        }
+        if (testAppender != null) {
+            testAppender.stop();
+            LoggerContext loggerContext = (LoggerContext) 
LogManager.getContext(false);
+            Configuration config = loggerContext.getConfiguration();
+            LoggerConfig loggerConfig = 
config.getLoggerConfig(Slf4jEventLogger.EVENT_LOGGER_NAME);
+            loggerConfig.removeAppender(testAppender.getName());
+            loggerContext.updateLoggers();
+        }
+    }
+
+    @Test
+    void testAppendWritesJsonWithSubtaskContext() throws Exception {
+        EventLoggerConfig config = 
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+        logger = new Slf4jEventLogger(config);
+        logger.open(openParams);
+
+        InputEvent inputEvent = new InputEvent("test input");
+        EventContext context = new EventContext(inputEvent);
+
+        logger.append(context, inputEvent);
+
+        List<String> messages = testAppender.getMessages();
+        assertEquals(1, messages.size(), "Should have logged one message");
+
+        JsonNode jsonNode = objectMapper.readTree(messages.get(0));
+        // Verify subtask context fields
+        assertEquals(testJobId.toString(), jsonNode.get("jobId").asText());
+        assertEquals(testTaskName, jsonNode.get("taskName").asText());
+        assertEquals(testSubTaskId, jsonNode.get("subtaskId").asInt());
+        // Verify event content
+        assertNotNull(jsonNode.get("timestamp"));
+        assertNotNull(jsonNode.get("event"));
+        assertEquals(InputEvent.EVENT_TYPE, 
jsonNode.get("eventType").asText());
+    }
+
+    @Test
+    void testAppendMultipleEvents() throws Exception {
+        EventLoggerConfig config = 
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+        logger = new Slf4jEventLogger(config);
+        logger.open(openParams);
+
+        InputEvent inputEvent = new InputEvent("input data");
+        OutputEvent outputEvent = new OutputEvent("output data");
+
+        logger.append(new EventContext(inputEvent), inputEvent);
+        logger.append(new EventContext(outputEvent), outputEvent);
+
+        List<String> messages = testAppender.getMessages();
+        assertEquals(2, messages.size(), "Should have logged two messages");
+
+        JsonNode inputJson = objectMapper.readTree(messages.get(0));
+        assertEquals("input data", 
inputJson.get("event").get("attributes").get("input").asText());
+
+        JsonNode outputJson = objectMapper.readTree(messages.get(1));
+        assertEquals(
+                "output data", 
outputJson.get("event").get("attributes").get("output").asText());
+    }
+
+    @Test
+    void testRootLevelOffSkipsAllEvents() throws Exception {
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put(AgentConfigOptions.EVENT_LOG_LEVEL.getKey(), "OFF");
+        EventLoggerConfig config =
+                EventLoggerConfig.builder()
+                        .loggerType(LoggerType.SLF4J)
+                        .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
+                        .build();
+        logger = new Slf4jEventLogger(config);
+        logger.open(openParams);
+
+        InputEvent inputEvent = new InputEvent("input data");
+        logger.append(new EventContext(inputEvent), inputEvent);
+
+        List<String> messages = testAppender.getMessages();
+        assertTrue(messages.isEmpty(), "No events should be logged when root 
level is OFF");
+    }
+
+    @Test
+    void testPerEventTypeOffSkipsMatchingEvents() throws Exception {
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put("event-log.type." + InputEvent.EVENT_TYPE + ".level", 
"OFF");
+        EventLoggerConfig config =
+                EventLoggerConfig.builder()
+                        .loggerType(LoggerType.SLF4J)
+                        .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
+                        .build();
+        logger = new Slf4jEventLogger(config);
+        logger.open(openParams);
+
+        InputEvent inputEvent = new InputEvent("input data");
+        OutputEvent outputEvent = new OutputEvent("output data");
+
+        logger.append(new EventContext(inputEvent), inputEvent);
+        logger.append(new EventContext(outputEvent), outputEvent);
+
+        List<String> messages = testAppender.getMessages();
+        assertEquals(
+                1, messages.size(), "Only OutputEvent should be logged when 
InputEvent is OFF");
+
+        JsonNode jsonNode = objectMapper.readTree(messages.get(0));
+        assertEquals("output data", 
jsonNode.get("event").get("attributes").get("output").asText());
+    }
+
+    @Test
+    void testLogLevelAppearsInJson() throws Exception {
+        EventLoggerConfig config = 
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+        logger = new Slf4jEventLogger(config);
+        logger.open(openParams);
+
+        InputEvent inputEvent = new InputEvent("test input");
+        logger.append(new EventContext(inputEvent), inputEvent);
+
+        List<String> messages = testAppender.getMessages();
+        assertEquals(1, messages.size());
+        JsonNode jsonNode = objectMapper.readTree(messages.get(0));
+        assertNotNull(jsonNode.get("logLevel"), "logLevel field should be 
present in JSON output");
+    }
+
+    @Test
+    void testDefaultIsNotPrettyPrinted() throws Exception {
+        // Default config should produce single-line JSON, matching 
AgentConfigOptions.PRETTY_PRINT
+        // default (false).
+        EventLoggerConfig config = 
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+        logger = new Slf4jEventLogger(config);
+        logger.open(openParams);
+
+        InputEvent inputEvent = new InputEvent("test input");
+        logger.append(new EventContext(inputEvent), inputEvent);
+
+        List<String> messages = testAppender.getMessages();
+        assertEquals(1, messages.size());
+
+        String json = messages.get(0);
+        assertFalse(json.contains("\n"), "Default output should be single 
line");
+        assertDoesNotThrow(() -> objectMapper.readTree(json), "Output should 
be valid JSON");
+    }
+
+    @Test
+    void testEnablePrettyPrint() throws Exception {
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put(AgentConfigOptions.PRETTY_PRINT.getKey(), true);
+        EventLoggerConfig config =
+                EventLoggerConfig.builder()
+                        .loggerType(LoggerType.SLF4J)
+                        .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
+                        .build();
+        logger = new Slf4jEventLogger(config);
+        logger.open(openParams);
+
+        InputEvent inputEvent = new InputEvent("test input");
+        logger.append(new EventContext(inputEvent), inputEvent);
+
+        List<String> messages = testAppender.getMessages();
+        assertEquals(1, messages.size());
+
+        String json = messages.get(0);
+        assertTrue(json.contains("\n"), "Pretty-printed output should span 
multiple lines");
+        assertDoesNotThrow(
+                () -> objectMapper.readTree(json), "Pretty-printed output 
should be valid JSON");
+    }
+
+    @Test
+    void testFlushAndCloseAreNoOps() throws Exception {
+        EventLoggerConfig config = 
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+        logger = new Slf4jEventLogger(config);
+        logger.open(openParams);
+
+        assertDoesNotThrow(() -> logger.flush(), "flush() should not throw");
+        assertDoesNotThrow(() -> logger.close(), "close() should not throw");
+    }
+
+    /** A log4j2 appender that captures log messages for testing. */
+    private static class TestAppender extends AbstractAppender {
+
+        private final List<String> messages = Collections.synchronizedList(new 
ArrayList<>());
+
+        protected TestAppender(String name) {
+            super(name, null, 
PatternLayout.newBuilder().withPattern("%msg").build(), true, null);
+        }
+
+        @Override
+        public void append(LogEvent event) {
+            messages.add(event.getMessage().getFormattedMessage());
+        }
+
+        public List<String> getMessages() {
+            return messages;
+        }
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 45c46f14..2b2f5a29 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.agents.api.configuration.AgentConfigOptions;
 import org.apache.flink.agents.api.context.DurableCallable;
 import org.apache.flink.agents.api.context.MemoryObject;
 import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.logger.EventLoggerConfig;
+import org.apache.flink.agents.api.logger.LoggerType;
 import org.apache.flink.agents.plan.AgentConfiguration;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.JavaFunction;
@@ -366,6 +368,7 @@ public class ActionExecutionOperatorTest {
     void testEventLogBaseDirFromAgentConfig() throws Exception {
         String baseLogDir = "/tmp/flink-agents-test";
         AgentConfiguration config = new AgentConfiguration();
+        config.set(AgentConfigOptions.EVENT_LOGGER_TYPE, LoggerType.FILE);
         config.set(AgentConfigOptions.BASE_LOG_DIR, baseLogDir);
         config.set(AgentConfigOptions.PRETTY_PRINT, true);
         AgentPlan agentPlan = TestAgent.getAgentPlanWithConfig(config);
@@ -389,9 +392,13 @@ public class ActionExecutionOperatorTest {
             @SuppressWarnings("unchecked")
             Map<String, Object> properties =
                     (Map<String, Object>) propertiesField.get(loggerConfig);
-            
assertThat(properties.get(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY))
+            @SuppressWarnings("unchecked")
+            Map<String, Object> agentConfig =
+                    (Map<String, Object>)
+                            
properties.get(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY);
+            
assertThat(agentConfig.get(AgentConfigOptions.BASE_LOG_DIR.getKey()))
                     .isEqualTo(baseLogDir);
-            
assertThat(properties.get(FileEventLogger.PRETTY_PRINT_PROPERTY_KEY)).isEqualTo(true);
+            
assertThat(agentConfig.get(AgentConfigOptions.PRETTY_PRINT.getKey())).isEqualTo(true);
         }
     }
 

Reply via email to