This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 61490ea5 [feature] Enable EventLog display in WebUI by default (#638)
61490ea5 is described below
commit 61490ea513ed81e4fde316dd1094e55b6bbc9838
Author: Eugene <[email protected]>
AuthorDate: Wed May 13 11:35:21 2026 +0800
[feature] Enable EventLog display in WebUI by default (#638)
* [hotfix] Add log4j when running the end-to-end test in Flink 1.20
* [runtime] Consolidate event-log settings into single agent-config map
* [feature] Enable EventLog display in WebUI by default
---
.../api/configuration/AgentConfigOptions.java | 9 +
.../flink/agents/api/logger/EventLoggerConfig.java | 82 +++---
.../agents/api/logger/EventLoggerFactory.java | 104 +++----
.../apache/flink/agents/api/logger/LoggerType.java | 71 +++++
docs/content/docs/operations/configuration.md | 3 +-
docs/content/docs/operations/monitoring.md | 22 +-
.../pom.xml | 22 ++
python/flink_agents/api/core_options.py | 18 ++
.../agents/runtime/eventlog/FileEventLogger.java | 38 +--
.../agents/runtime/eventlog/Slf4jEventLogger.java | 276 +++++++++++++++++++
.../flink/agents/runtime/operator/EventRouter.java | 29 +-
.../runtime/eventlog/FileEventLoggerTest.java | 65 ++---
.../runtime/eventlog/Slf4jEventLoggerTest.java | 302 +++++++++++++++++++++
.../operator/ActionExecutionOperatorTest.java | 11 +-
14 files changed, 862 insertions(+), 190 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 9cdda8bd..100894f7 100644
---
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -18,10 +18,19 @@
package org.apache.flink.agents.api.configuration;
import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.apache.flink.agents.api.logger.LoggerType;
/** The set of configuration options for agents parameters. */
public class AgentConfigOptions {
+ /**
+ * The config parameter specifies which event logger implementation to
use. Defaults to {@link
+ * LoggerType#SLF4J}, which surfaces events in Flink's Web UI; setting
{@link LoggerType#FILE}
+ * (or configuring {@link #BASE_LOG_DIR}) routes events to per-subtask log
files instead.
+ */
+ public static final ConfigOption<LoggerType> EVENT_LOGGER_TYPE =
+ new ConfigOption<>("eventLoggerType", LoggerType.class,
LoggerType.SLF4J);
+
/** The config parameter specifies the directory for the FileEvent file. */
public static final ConfigOption<String> BASE_LOG_DIR =
new ConfigOption<>("baseLogDir", String.class, null);
diff --git
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
index 1dedeef0..870e8108 100644
---
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
+++
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
@@ -26,27 +26,42 @@ import java.util.Objects;
/**
* Unified configuration for event loggers with a fluent builder API.
*
- * <p>This class provides a unified approach to configuring different types of
event loggers using
- * string-based logger type identification and a flexible property map for
implementation-specific
- * parameters.
+ * <p>A config selects an implementation via a {@link LoggerType} and carries
a property map. The
+ * full agent configuration (e.g., {@code AgentConfiguration.getConfData()})
is passed through under
+ * {@link #AGENT_CONFIG_PROPERTY_KEY}; logger settings such as {@code
baseLogDir}, {@code
+ * prettyPrint}, and the {@code event-log.*} keys live inside that map rather
than as top-level
+ * properties.
*
* <h3>Usage Examples</h3>
*
* <pre>{@code
- * // Enable default file-based event logging with custom properties
+ * // Enable file-based event logging with custom log directory.
+ * Map<String, Object> agentConfig = new HashMap<>();
+ * agentConfig.put("baseLogDir", "/tmp/logs");
* EventLoggerConfig fileConfig = EventLoggerConfig.builder()
- * .loggerType("file")
- * .property("baseLogDir", "/tmp/logs")
+ * .loggerType(LoggerType.FILE)
+ * .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY, agentConfig)
* .build();
* }</pre>
*/
public final class EventLoggerConfig {
- private final String loggerType;
+ /**
+ * Property key used to pass the full agent config data map (e.g., {@code
+ * AgentConfiguration.getConfData()}) into a logger via {@link
Builder#property(String,
+ * Object)}.
+ *
+ * <p>Built-in loggers read this property to initialize per-event-type log
level resolution and
+ * STANDARD-level truncation. Custom loggers may use the same property to
access agent-level
+ * configuration without taking a hard dependency on the runtime module.
+ */
+ public static final String AGENT_CONFIG_PROPERTY_KEY = "agentConfig";
+
+ private final LoggerType loggerType;
private final Map<String, Object> properties;
/** Private constructor - use {@link #builder()} to create instances. */
- private EventLoggerConfig(String loggerType, Map<String, Object>
properties) {
+ private EventLoggerConfig(LoggerType loggerType, Map<String, Object>
properties) {
this.loggerType = Objects.requireNonNull(loggerType, "Logger type
cannot be null");
this.properties = Collections.unmodifiableMap(new
HashMap<>(properties));
}
@@ -61,31 +76,22 @@ public final class EventLoggerConfig {
}
/**
- * Gets the logger type identifier.
+ * Gets the logger type. Built-in types include {@link LoggerType#SLF4J}
(default, outputs to
+ * Flink Web UI via log4j2) and {@link LoggerType#FILE} (writes to
per-subtask log files).
*
- * <p>This string identifier is used to determine which EventLogger
implementation should be
- * instantiated. Built-in logger types include:
- *
- * <ul>
- * <li>"file" - File-based event logger (default)
- * </ul>
- *
- * @return the logger type identifier (e.g., "file", "database", "kafka")
+ * @return the logger type, never null
*/
- public String getLoggerType() {
+ public LoggerType getLoggerType() {
return loggerType;
}
/**
- * Gets the implementation-specific properties for this logger
configuration.
- *
- * <p>These properties contain logger-specific configuration parameters
that are not common
- * across all logger implementations. For example:
+ * Gets the properties carried by this configuration.
*
- * <ul>
- * <li>File logger: "baseLogDir", "maxFileSize", "compression"
- * <li>Database logger: "connectionUrl", "tableName", "batchSize"
- * </ul>
+ * <p>Currently the only well-known key is {@link
#AGENT_CONFIG_PROPERTY_KEY}, which holds the
+ * full agent config map; logger-specific settings (e.g., {@code
baseLogDir}, {@code
+ * prettyPrint}, {@code event-log.*}) are looked up inside that map by the
logger implementation
+ * rather than from this top-level map.
*
* @return an immutable map of property names to values, never null
*/
@@ -109,13 +115,7 @@ public final class EventLoggerConfig {
@Override
public String toString() {
- return "EventLoggerConfig{"
- + "loggerType='"
- + loggerType
- + '\''
- + ", properties="
- + properties
- + '}';
+ return "EventLoggerConfig{loggerType=" + loggerType + ", properties="
+ properties + '}';
}
/**
@@ -125,23 +125,23 @@ public final class EventLoggerConfig {
* validation and sensible defaults.
*/
public static final class Builder {
- private String loggerType = "file"; // Default to file logger
+ private LoggerType loggerType = LoggerType.SLF4J;
private final Map<String, Object> properties = new HashMap<>();
private Builder() {}
/**
- * Sets the logger type identifier.
+ * Sets the logger type.
*
- * @param loggerType the logger type (e.g., "file", "database",
"kafka")
+ * @param loggerType the built-in logger type
* @return this Builder instance for method chaining
- * @throws IllegalArgumentException if loggerType is null or empty
+ * @throws IllegalArgumentException if loggerType is null
*/
- public Builder loggerType(String loggerType) {
- if (loggerType == null || loggerType.trim().isEmpty()) {
- throw new IllegalArgumentException("Logger type cannot be null
or empty");
+ public Builder loggerType(LoggerType loggerType) {
+ if (loggerType == null) {
+ throw new IllegalArgumentException("Logger type cannot be
null");
}
- this.loggerType = loggerType.trim();
+ this.loggerType = loggerType;
return this;
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerFactory.java
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerFactory.java
index 48f23e57..7f7e12e0 100644
---
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerFactory.java
+++
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerFactory.java
@@ -24,16 +24,11 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/**
- * Factory for creating EventLogger instances based on logger type identifiers.
+ * Factory for creating EventLogger instances based on {@link LoggerType}.
*
- * <p>This factory uses a string-based registry approach that allows both
built-in and custom
- * EventLogger implementations to be created in a flexible manner. Built-in
loggers are
- * automatically registered, while custom loggers can be registered using the
{@link
- * #registerFactory(String, Function)} method.
- *
- * <p>Logger types are identified by simple string identifiers (e.g., "file",
"database", "kafka"),
- * making the factory easy to use with configuration files and providing a
clean abstraction for
- * different logger implementations.
+ * <p>Built-in loggers (see {@link LoggerType}) are auto-registered.
Additional implementations for
+ * a given {@link LoggerType} can replace the built-in factory via {@link
+ * #registerFactory(LoggerType, Function)}.
*
* <h3>Thread Safety</h3>
*
@@ -44,31 +39,21 @@ import java.util.function.Function;
* <h3>Usage Examples</h3>
*
* <pre>{@code
- * // Create built-in file logger
* EventLoggerConfig fileConfig = EventLoggerConfig.builder()
- * .loggerType("file")
+ * .loggerType(LoggerType.FILE)
* .property("baseLogDir", "/tmp/flink-agents")
* .build();
* EventLogger fileLogger = EventLoggerFactory.createLogger(fileConfig);
- *
- * // Register and use custom logger
- * EventLoggerFactory.registerFactory("database",
- * config -> new DatabaseEventLogger(config));
- *
- * EventLoggerConfig dbConfig = EventLoggerConfig.builder()
- * .loggerType("database")
- * .property("jdbcUrl", "jdbc:mysql://localhost/logs")
- * .build();
- * EventLogger customLogger = EventLoggerFactory.createLogger(dbConfig);
* }</pre>
*
* @see EventLogger
* @see EventLoggerConfig
+ * @see LoggerType
*/
public final class EventLoggerFactory {
- /** Thread-safe registry of factory functions keyed by logger type
identifier. */
- private static final Map<String, Function<EventLoggerConfig, EventLogger>>
FACTORIES =
+ /** Thread-safe registry of factory functions keyed by {@link LoggerType}.
*/
+ private static final Map<LoggerType, Function<EventLoggerConfig,
EventLogger>> FACTORIES =
new ConcurrentHashMap<>();
private EventLoggerFactory() {}
@@ -80,9 +65,6 @@ public final class EventLoggerFactory {
/**
* Creates an EventLogger instance based on the provided configuration.
*
- * <p>This method looks up the appropriate factory function for the logger
type specified in the
- * configuration and uses it to create the EventLogger instance.
- *
* @param config the EventLogger configuration
* @return a new EventLogger instance configured according to the provided
config
* @throws IllegalArgumentException if config is null or no factory is
registered for the logger
@@ -94,19 +76,13 @@ public final class EventLoggerFactory {
throw new IllegalArgumentException("EventLoggerConfig cannot be
null");
}
- String loggerType = config.getLoggerType();
- if (loggerType == null || loggerType.trim().isEmpty()) {
- throw new IllegalArgumentException("Logger type cannot be null or
empty");
- }
-
+ LoggerType loggerType = config.getLoggerType();
Function<EventLoggerConfig, EventLogger> factory =
FACTORIES.get(loggerType);
if (factory == null) {
throw new IllegalArgumentException(
String.format(
- "No factory registered for logger type: '%s'. "
- + "Available types: %s. "
- + "Use
EventLoggerFactory.registerFactory() to register custom loggers.",
+ "No factory registered for logger type: %s.
Available types: %s.",
loggerType, FACTORIES.keySet()));
}
@@ -119,67 +95,51 @@ public final class EventLoggerFactory {
}
/**
- * Registers a factory function for a specific logger type.
- *
- * <p>This method allows custom EventLogger implementations to be
registered with the factory.
- * Once registered, the factory can create instances of the custom logger
using the {@link
- * #createLogger(EventLoggerConfig)} method.
+ * Registers a factory function for a specific {@link LoggerType},
replacing any previously
+ * registered factory for that type.
*
- * <p>If a factory is already registered for the given logger type, it
will be replaced with the
- * new factory function.
- *
- * @param loggerType the logger type identifier (e.g., "file", "database",
"kafka")
+ * @param loggerType the logger type
* @param factory the factory function that creates EventLogger instances
- * @throws IllegalArgumentException if loggerType or factory is null or if
loggerType is empty
+ * @throws IllegalArgumentException if loggerType or factory is null
*/
public static void registerFactory(
- String loggerType, Function<EventLoggerConfig, EventLogger>
factory) {
- if (loggerType == null || loggerType.trim().isEmpty()) {
- throw new IllegalArgumentException("Logger type cannot be null or
empty");
- }
+ LoggerType loggerType, Function<EventLoggerConfig, EventLogger>
factory) {
+ Objects.requireNonNull(loggerType, "Logger type cannot be null");
Objects.requireNonNull(factory, "Factory function cannot be null");
-
- FACTORIES.put(loggerType.trim(), factory);
+ FACTORIES.put(loggerType, factory);
}
- /**
- * Registers built-in EventLogger factories.
- *
- * <p>This method is called during class initialization to register
factories for all built-in
- * EventLogger implementations.
- */
+ /** Registers built-in EventLogger factories during class initialization.
*/
private static void registerBuiltInFactories() {
- registerFileEventLoggerFactory();
+ registerByReflection(
+ LoggerType.FILE,
"org.apache.flink.agents.runtime.eventlog.FileEventLogger");
+ registerByReflection(
+ LoggerType.SLF4J,
"org.apache.flink.agents.runtime.eventlog.Slf4jEventLogger");
}
/**
- * Registers the built-in file event logger factory.
- *
- * <p>This uses reflection to avoid hard dependencies on the runtime
module, allowing the API
- * module to be used independently.
+ * Registers a built-in logger factory backed by a runtime-module class
loaded reflectively, so
+ * the api module does not need a compile-time dependency on the runtime
module. Silently skips
+ * registration when the runtime class is not on the classpath.
*/
- private static void registerFileEventLoggerFactory() {
+ private static void registerByReflection(LoggerType loggerType, String
className) {
try {
- // Try to load FileEventLogger class
- Class<?> fileLoggerClass =
-
Class.forName("org.apache.flink.agents.runtime.eventlog.FileEventLogger");
-
+ Class<?> loggerClass = Class.forName(className);
registerFactory(
- "file",
+ loggerType,
config -> {
try {
- // FileEventLogger now takes unified
EventLoggerConfig directly
return (EventLogger)
- fileLoggerClass
+ loggerClass
.getConstructor(EventLoggerConfig.class)
.newInstance(config);
} catch (Exception e) {
- throw new RuntimeException("Failed to create
FileEventLogger", e);
+ throw new RuntimeException(
+ "Failed to create " + loggerType + " event
logger", e);
}
});
} catch (ClassNotFoundException e) {
- // FileEventLogger not found, skip registration
- // This is expected if the runtime module is not on the classpath
+ // The runtime module is not on the classpath; the built-in logger
is unavailable.
}
}
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/logger/LoggerType.java
b/api/src/main/java/org/apache/flink/agents/api/logger/LoggerType.java
new file mode 100644
index 00000000..4668ec89
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/logger/LoggerType.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.logger;
+
+/**
+ * Enumeration of built-in EventLogger types.
+ *
+ * <p>Each {@code LoggerType} corresponds to a built-in {@link EventLogger}
implementation
+ * registered with the {@link EventLoggerFactory}. The {@link #getType()}
string is the identifier
+ * used by the factory registry, the configuration system, and {@link
+ * EventLoggerConfig#getLoggerType()}.
+ */
+public enum LoggerType {
+ /** SLF4J-based event logger; outputs to Flink Web UI via log4j2. */
+ SLF4J("slf4j"),
+
+ /** File-based event logger; writes events to per-subtask log files. */
+ FILE("file");
+
+ private final String type;
+
+ LoggerType(String type) {
+ this.type = type;
+ }
+
+ /**
+ * Gets the string identifier used to register and look up this logger
type in {@link
+ * EventLoggerFactory}.
+ *
+ * @return the logger type identifier
+ */
+ public String getType() {
+ return type;
+ }
+
+ /**
+ * Resolves a {@link LoggerType} from its string identifier.
+ *
+ * @param type the logger type identifier (e.g., "slf4j", "file")
+ * @return the matching {@link LoggerType}
+ * @throws IllegalArgumentException if no matching type exists
+ */
+ public static LoggerType fromType(String type) {
+ if (type == null) {
+ throw new IllegalArgumentException("Logger type cannot be null");
+ }
+ String trimmed = type.trim();
+ for (LoggerType value : values()) {
+ if (value.type.equalsIgnoreCase(trimmed)) {
+ return value;
+ }
+ }
+ throw new IllegalArgumentException("Unknown logger type: " + type);
+ }
+}
diff --git a/docs/content/docs/operations/configuration.md
b/docs/content/docs/operations/configuration.md
index 9b325837..8a387def 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -126,7 +126,8 @@ Here is the list of all built-in core configuration options.
| Key | Default | Type
| Description
|
|---------------------------|----------------------------|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `baseLogDir` | (none) | String
| Base directory for file-based event logs. If not set, uses
`java.io.tmpdir/flink-agents`.
|
+| `eventLoggerType` | `SLF4J` | LoggerType
| Which built-in event logger to use. Valid values: `SLF4J` (writes JSON
through a dedicated SLF4J logger so events show up in Flink's Web UI **Logs**
tab) and `FILE` (writes per-subtask `.log` files under `baseLogDir`). Setting
`baseLogDir` overrides this and forces `FILE`. |
+| `baseLogDir` | (none) | String
| Base directory for file-based event logs. If not set, uses
`java.io.tmpdir/flink-agents`. Setting this value also implicitly switches
`eventLoggerType` to `file`.
|
| `prettyPrint` | false | boolean
| Whether to enable pretty-printed JSON format for event logs. When set to
`true`, each event is written as formatted multi-line JSON instead of JSONL
(JSON Lines) format. {{< hint info >}}Note: enabling this option makes the log
file no longer valid JSONL format. {{< /hint >}} |
| `error-handling-strategy` | ErrorHandlingStrategy.FAIL |
ErrorHandlingStrategy | Strategy for handling errors during model requests,
include timeout and unexpected output schema. <br/>The option value could
be:<br/> <ul><li>`ErrorHandlingStrategy.FAIL`</li>
<li>`ErrorHandlingStrategy.RETRY`</li> <li>`ErrorHandlingStrategy.IGNORE`</li> |
| `max-retries` | 3 | int
| Number of retries when using `ErrorHandlingStrategy.RETRY`.
|
diff --git a/docs/content/docs/operations/monitoring.md
b/docs/content/docs/operations/monitoring.md
index 12c94840..1925c7f6 100644
--- a/docs/content/docs/operations/monitoring.md
+++ b/docs/content/docs/operations/monitoring.md
@@ -143,11 +143,19 @@ We can check the log result in the WebUI of Flink Job:
## Event Log
-Currently, the system supports **File-based Event Log** as the default
implementation. Future releases will introduce support for additional types of
event logs and provide configuration options to let users choose their
preferred logging mechanism.
+The system supports two types of event loggers: **SLF4J Event Log** (default)
and **File Event Log**.
+
+By default, the SLF4J Event Log is used. If `baseLogDir` is configured, the
system automatically switches to the File Event Log.
+
+### SLF4J Event Log (Default)
+
+The **SLF4J Event Log** outputs events through a dedicated SLF4J logger
(`org.apache.flink.agents.EventLog`). On startup, the logger **automatically
configures** log4j2 to write events to a separate file
(`{log.file}.event-log.log`) in Flink's log directory, making them visible in
Flink's Web UI **Logs** tab. **No manual log4j2 configuration is required.**
+
+Because all subtasks on a TaskManager share the same log destination, each
record additionally carries `jobId`, `taskName`, and `subtaskId` top-level
fields so consumers can still distinguish events from different subtasks. The
rest of the record follows the common [JSON Format](#json-format) described
below.
### File Event Log
-The **File Event Log** is a file-based event logging system that stores events
in structured files within a flat directory.
+The **File Event Log** is a file-based event logging system that stores events
in structured files within a flat directory. To use it, configure `baseLogDir`
in your Flink `config.yaml`.
By default, each event is recorded in **JSON Lines (JSONL)** format, with one
JSON object per line. When [`prettyPrint`]({{< ref
"docs/operations/configuration#core-options" >}}) is enabled, each event is
written as formatted multi-line JSON instead, and the log file is no longer in
valid JSONL format.
@@ -164,7 +172,9 @@ The log files follow a naming convention consistent with
Flink's logging standar
By default, all File-based Event Logs are stored in the `flink-agents`
subdirectory under the system temporary directory (`java.io.tmpdir`). You can
override the base log directory with the `agent.baseLogDir` setting in Flink
`config.yaml`.
-#### JSON Format
+### JSON Format
+
+The JSON record format described here applies to both the SLF4J and File event
loggers. The SLF4J logger adds `jobId`, `taskName`, and `subtaskId` fields on
top (see [SLF4J Event Log](#slf4j-event-log-default)); the File logger encodes
those values in the file path instead.
Each record contains a top-level `timestamp`, the resolved `logLevel`, and a
top-level `eventType` routing key (mirrors `event.eventType`), followed by the
full event object. The top-level `eventType` makes it easy for downstream tools
(e.g. `grep`, `jq`, log shippers) to filter by event type without parsing
nested JSON:
@@ -180,7 +190,7 @@ Each record contains a top-level `timestamp`, the resolved
`logLevel`, and a top
}
```
-#### Event Log Levels
+### Event Log Levels
Each event type is logged at a configurable verbosity. Three levels are
supported:
@@ -226,7 +236,7 @@ Example record at `STANDARD` with a long string and a large
array truncated:
}
```
-#### Per-event-type log levels
+### Per-event-type log levels
You can override the level for individual event types using the
`event-log.type.<EVENT_TYPE>.level` config key, where `<EVENT_TYPE>` is the
event's routing type string (the same string that appears as `eventType` in the
JSON log). Built-in events use short snake-cased names such as:
@@ -276,7 +286,7 @@ flink run ... \
Other per-type levels from `config.yaml` are preserved — the `-D` flag only
overrides the one key it names.
-#### Compatibility Notes
+### Compatibility Notes
- **Default behavior changed.** Before this feature, every event was logged in
full. The new default is `STANDARD`, which truncates large payloads. To restore
the previous behavior either globally or per type, set the level to `VERBOSE`.
- **Old log records still parse.** Records written before this feature have no
`logLevel` or top-level `eventType`. They deserialize correctly and are treated
as `VERBOSE` (their payloads were never truncated).
diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
index 06db5577..44d76579 100644
--- a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
@@ -139,6 +139,28 @@ under the License.
<flink.version>${flink.1.20.version}</flink.version>
<flink.agents.dist.artifactId>flink-agents-dist-flink-1.20</flink.agents.dist.artifactId>
</properties>
+ <dependencies>
+ <!-- Flink 1.20 does not transitively provide log4j-core in
test classpath,
+ needed for Slf4jEventLogger which uses log4j2 core APIs
-->
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</profile>
<!-- Flink 2.0 Profile -->
diff --git a/python/flink_agents/api/core_options.py
b/python/flink_agents/api/core_options.py
index 98b8b317..5b575c3f 100644
--- a/python/flink_agents/api/core_options.py
+++ b/python/flink_agents/api/core_options.py
@@ -82,6 +82,18 @@ class ErrorHandlingStrategy(Enum):
IGNORE = "ignore"
+class LoggerType(Enum):
+ """Built-in event logger types.
+
+ Mirrors the Java ``LoggerType`` enum so Python users can configure the
+ logger type via ``AgentConfigOptions.EVENT_LOGGER_TYPE`` without using
+ raw strings.
+ """
+
+ SLF4J = "slf4j"
+ FILE = "file"
+
+
class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
"""CoreOptions to manage core configuration parameters for Flink Agents."""
@@ -91,6 +103,12 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
default=None,
)
+ EVENT_LOGGER_TYPE = ConfigOption(
+ key="eventLoggerType",
+ config_type=LoggerType,
+ default=LoggerType.SLF4J,
+ )
+
# Event log level config options
EVENT_LOG_LEVEL = ConfigOption(
key="event-log.level",
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
index 7b42eb7a..9cedb632 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
@@ -81,15 +81,10 @@ import java.util.Map;
* </pre>
*/
public class FileEventLogger implements EventLogger {
- public static final String BASE_LOG_DIR_PROPERTY_KEY = "baseLogDir";
- public static final String PRETTY_PRINT_PROPERTY_KEY = "prettyPrint";
// The default base log directory if not specified in the configuration
private static final String DEFAULT_BASE_LOG_DIR =
Paths.get(System.getProperty("java.io.tmpdir"),
"flink-agents").toString();
- /** Property key for passing the full agent config data map into the
logger. */
- public static final String AGENT_CONFIG_PROPERTY_KEY = "agentConfig";
-
private static final ObjectMapper MAPPER = new ObjectMapper();
private final EventLoggerConfig config;
@@ -105,7 +100,20 @@ public class FileEventLogger implements EventLogger {
@Override
public void open(EventLoggerOpenParams params) throws Exception {
- String logFilePath = generateSubTaskLogFilePath(params);
+ // The full agent config is the single source of truth for all logger
settings.
+ @SuppressWarnings("unchecked")
+ Map<String, Object> agentConfig =
+ (Map<String, Object>)
+ config.getProperties()
+ .getOrDefault(
+
EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
+ Collections.emptyMap());
+
+ String baseLogDir =
+ (String)
+ agentConfig.getOrDefault(
+ AgentConfigOptions.BASE_LOG_DIR.getKey(),
DEFAULT_BASE_LOG_DIR);
+ String logFilePath = generateSubTaskLogFilePath(params, baseLogDir);
// Create base directory if it doesn't exist
Path logPath = Paths.get(logFilePath).getParent();
if (!Files.exists(logPath)) {
@@ -114,14 +122,11 @@ public class FileEventLogger implements EventLogger {
// Create writer in append mode
writer = new PrintWriter(new BufferedWriter(new
FileWriter(logFilePath, true)));
prettyPrint =
- (Boolean)
config.getProperties().getOrDefault(PRETTY_PRINT_PROPERTY_KEY, false);
+ (Boolean)
+ agentConfig.getOrDefault(
+ AgentConfigOptions.PRETTY_PRINT.getKey(),
+
AgentConfigOptions.PRETTY_PRINT.getDefaultValue());
- // Initialize level resolver and truncator from agent config
- @SuppressWarnings("unchecked")
- Map<String, Object> agentConfig =
- (Map<String, Object>)
- config.getProperties()
- .getOrDefault(AGENT_CONFIG_PROPERTY_KEY,
Collections.emptyMap());
this.levelResolver = new EventLogLevelResolver(agentConfig);
int maxStringLength =
getIntFromConfig(
@@ -156,12 +161,7 @@ public class FileEventLogger implements EventLogger {
}
}
- private String generateSubTaskLogFilePath(EventLoggerOpenParams params) {
- // Get base log directory from properties
- String baseLogDir =
- (String)
- config.getProperties()
- .getOrDefault(BASE_LOG_DIR_PROPERTY_KEY,
DEFAULT_BASE_LOG_DIR);
+ private String generateSubTaskLogFilePath(EventLoggerOpenParams params,
String baseLogDir) {
String jobId =
params.getRuntimeContext().getJobInfo().getJobId().toString();
String taskName =
params.getRuntimeContext()
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLogger.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLogger.java
new file mode 100644
index 00000000..d5f00f87
--- /dev/null
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLogger.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.EventContext;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.apache.flink.agents.api.logger.EventLogger;
+import org.apache.flink.agents.api.logger.EventLoggerConfig;
+import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.metrics.Counter;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.FileAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * An SLF4J-based event logger that outputs events through a dedicated SLF4J
logger.
+ *
+ * <p>This logger writes event log records as JSON to a dedicated SLF4J logger
named {@value
+ * #EVENT_LOGGER_NAME}. Events are automatically routed to a separate file in
Flink's log directory,
+ * making them visible in Flink's Web UI "Logs" tab.
+ *
+ * <p>On {@link #open}, the logger automatically configures log4j2 to write
event logs to a separate
+ * file (derived from Flink's {@code log.file} system property). No manual
log4j2 configuration is
+ * required.
+ *
+ * <p>Unlike {@link FileEventLogger}, which creates a separate log file per
subtask, this logger
+ * writes all events from a TaskManager to the same log destination. To
distinguish events from
+ * different subtasks, each JSON record includes {@code jobId}, {@code
taskName}, and {@code
+ * subtaskId} fields.
+ *
+ * <p>This logger honors the per-event-type log level configuration resolved
by {@link
+ * EventLogLevelResolver}; events resolved to {@link EventLogLevel#OFF} are
skipped, and events at
+ * {@link EventLogLevel#STANDARD} have their payloads truncated by a {@link
JsonTruncator}.
+ *
+ * <h3>Thread Safety</h3>
+ *
+ * <p>This class is <strong>thread-safe at the Flink subtask level</strong>,
following the same
+ * guarantees as {@link FileEventLogger}. Each subtask instance gets its own
logger instance with
+ * its own subtask context fields.
+ */
+public class Slf4jEventLogger implements EventLogger {
+ /** Dedicated logger name for event log output. */
+ public static final String EVENT_LOGGER_NAME =
"org.apache.flink.agents.EventLog";
+
+ private static final String EVENT_LOG_APPENDER_NAME =
"FlinkAgentsEventLogAppender";
+
+ private static final Logger EVENT_LOG =
LoggerFactory.getLogger(EVENT_LOGGER_NAME);
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ private final EventLoggerConfig config;
+ private boolean prettyPrint;
+ private String jobId;
+ private String taskName;
+ private int subtaskId;
+ private EventLogLevelResolver levelResolver;
+ private JsonTruncator truncator;
+ private Counter truncatedEventsCounter;
+
+ public Slf4jEventLogger(EventLoggerConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void open(EventLoggerOpenParams params) throws Exception {
+ jobId = params.getRuntimeContext().getJobInfo().getJobId().toString();
+ taskName = params.getRuntimeContext().getTaskInfo().getTaskName();
+ subtaskId =
params.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+
+ // The full agent config is the single source of truth for all logger
settings
+ // (mirrors FileEventLogger).
+ @SuppressWarnings("unchecked")
+ Map<String, Object> agentConfig =
+ (Map<String, Object>)
+ config.getProperties()
+ .getOrDefault(
+
EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
+ Collections.emptyMap());
+ prettyPrint =
+ (Boolean)
+ agentConfig.getOrDefault(
+ AgentConfigOptions.PRETTY_PRINT.getKey(),
+
AgentConfigOptions.PRETTY_PRINT.getDefaultValue());
+ this.levelResolver = new EventLogLevelResolver(agentConfig);
+ int maxStringLength =
+ getIntFromConfig(
+ agentConfig,
+
AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getKey(),
+
AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getDefaultValue());
+ int maxArrayElements =
+ getIntFromConfig(
+ agentConfig,
+
AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getKey(),
+
AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getDefaultValue());
+ int maxDepth =
+ getIntFromConfig(
+ agentConfig,
+ AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getKey(),
+
AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getDefaultValue());
+ this.truncator = new JsonTruncator(maxStringLength, maxArrayElements,
maxDepth);
+
+ ensureLog4j2AppenderConfigured();
+ }
+
+ private static int getIntFromConfig(Map<String, Object> config, String
key, int defaultValue) {
+ Object value = config.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ try {
+ return Integer.parseInt(value.toString());
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ @Override
+ public void append(EventContext context, Event event) throws Exception {
+ // Resolve log level and skip OFF events.
+ EventLogLevel level =
+ levelResolver != null
+ ? levelResolver.resolve(event.getType())
+ : EventLogLevel.VERBOSE;
+ if (level == EventLogLevel.OFF) {
+ return;
+ }
+
+ EventLogRecord record = new EventLogRecord(context, event);
+ JsonNode tree = MAPPER.valueToTree(record);
+ if (!(tree instanceof ObjectNode)) {
+ throw new IllegalStateException(
+ "EventLogRecord must serialize to a JSON object, but was: "
+ + tree.getNodeType());
+ }
+ ObjectNode rootNode = (ObjectNode) tree;
+
+ // Truncate the event subtree at STANDARD level.
+ if (level == EventLogLevel.STANDARD && truncator != null) {
+ JsonNode eventNode = rootNode.get("event");
+ if (eventNode instanceof ObjectNode) {
+ boolean truncated = truncator.truncate((ObjectNode) eventNode);
+ if (truncated && truncatedEventsCounter != null) {
+ truncatedEventsCounter.inc();
+ }
+ }
+ }
+
+ // Rebuild the top-level object so logLevel/subtask context sit
alongside the documented
+ // JSON layout: timestamp, logLevel, eventType, subtask context, event.
+ ObjectNode ordered = MAPPER.createObjectNode();
+ ordered.set("timestamp", rootNode.get("timestamp"));
+ ordered.put("logLevel", level.name());
+ ordered.set("eventType", rootNode.get("eventType"));
+ ordered.put("jobId", jobId);
+ ordered.put("taskName", taskName);
+ ordered.put("subtaskId", subtaskId);
+ ordered.set("event", rootNode.get("event"));
+
+ String json =
+ prettyPrint
+ ?
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(ordered)
+ : MAPPER.writeValueAsString(ordered);
+ EVENT_LOG.info(json);
+ }
+
+ @Override
+ public void flush() throws Exception {
+ // No-op: SLF4J/log4j2 handles flushing
+ }
+
+ /**
+ * Sets the counter for tracking truncated events. Called by the operator
after metrics are
+ * initialized.
+ *
+ * @param counter the counter to increment when events are truncated
+ */
+ public void setTruncatedEventsCounter(Counter counter) {
+ this.truncatedEventsCounter = counter;
+ }
+
+ @Override
+ public void close() throws Exception {
+ // No-op: SLF4J/log4j2 manages logger lifecycle
+ }
+
+ /**
+ * Configures the log4j2 event log appender programmatically.
+ *
+ * <p>This method creates a dedicated file appender that writes to {@code
+ * {log.file}.event-log.log} in the same directory as Flink's main log
file. If the appender has
+ * already been configured (e.g., by a previous subtask on the same
TaskManager), this method is
+ * a no-op.
+ */
+ private static synchronized void ensureLog4j2AppenderConfigured() {
+ try {
+ LoggerContext loggerContext = (LoggerContext)
LogManager.getContext(false);
+ Configuration configuration = loggerContext.getConfiguration();
+ LoggerConfig loggerConfig =
configuration.getLoggerConfig(EVENT_LOGGER_NAME);
+
+ // If the appender has already been configured, skip.
+ if (loggerConfig.getName().equals(EVENT_LOGGER_NAME)) {
+ return;
+ }
+
+ // Derive event log file path from Flink's log.file system property
+ String logFile = System.getProperty("log.file");
+ if (logFile == null || logFile.isEmpty()) {
+ // Not running in a Flink environment with log.file set,
+ // fall back to root logger (events will go to main log)
+ return;
+ }
+
+ String eventLogFile = logFile + ".event-log.log";
+
+ // Create a file appender with %msg%n pattern (JSON only, no log
metadata)
+ PatternLayout layout =
PatternLayout.newBuilder().withPattern("%msg%n").build();
+
+ FileAppender appender =
+ FileAppender.newBuilder()
+ .setName(EVENT_LOG_APPENDER_NAME)
+ .withFileName(eventLogFile)
+ .withAppend(true)
+ .setLayout(layout)
+ .build();
+ appender.start();
+ configuration.addAppender(appender);
+
+ // Create a dedicated logger config with additivity=false
+ LoggerConfig eventLoggerConfig = new
LoggerConfig(EVENT_LOGGER_NAME, Level.INFO, false);
+ eventLoggerConfig.addAppender(appender, Level.INFO, null);
+ configuration.addLogger(EVENT_LOGGER_NAME, eventLoggerConfig);
+
+ loggerContext.updateLoggers();
+ } catch (Exception e) {
+ // If programmatic configuration fails (e.g., not using log4j2),
+ // fall back silently — events will go to the root logger.
+ LoggerFactory.getLogger(Slf4jEventLogger.class)
+ .warn(
+ "Failed to auto-configure event log appender, "
+ + "events will be logged to the root
logger.",
+ e);
+ }
+ }
+}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
index a79f227d..096ac20d 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
@@ -27,9 +27,11 @@ import org.apache.flink.agents.api.logger.EventLogger;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.EventLoggerFactory;
import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.logger.LoggerType;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.eventlog.FileEventLogger;
+import org.apache.flink.agents.runtime.eventlog.Slf4jEventLogger;
import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
import org.apache.flink.agents.runtime.operator.queue.SegmentedQueue;
import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
@@ -46,7 +48,7 @@ import java.util.ArrayList;
import java.util.List;
import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR;
-import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.PRETTY_PRINT;
+import static
org.apache.flink.agents.api.configuration.AgentConfigOptions.EVENT_LOGGER_TYPE;
import static org.apache.flink.util.Preconditions.checkState;
/**
@@ -114,6 +116,9 @@ class EventRouter<IN, OUT> implements AutoCloseable {
if (eventLogger instanceof FileEventLogger) {
((FileEventLogger) eventLogger)
.setTruncatedEventsCounter(builtInMetrics.getEventLogTruncatedEventsCounter());
+ } else if (eventLogger instanceof Slf4jEventLogger) {
+ ((Slf4jEventLogger) eventLogger)
+
.setTruncatedEventsCounter(builtInMetrics.getEventLogTruncatedEventsCounter());
}
}
@@ -238,16 +243,24 @@ class EventRouter<IN, OUT> implements AutoCloseable {
}
private EventLogger createEventLogger(AgentPlan agentPlan) {
- EventLoggerConfig.Builder loggerConfigBuilder =
EventLoggerConfig.builder();
+ // Honor the EVENT_LOGGER_TYPE config, defaulting to SLF4J so events
surface in the Flink
+ // Web UI by default. An explicit baseLogDir forces the file logger
for backward
+ // compatibility with the existing file-based logging path.
+ LoggerType loggerType = agentPlan.getConfig().get(EVENT_LOGGER_TYPE);
String baseLogDir = agentPlan.getConfig().get(BASE_LOG_DIR);
if (baseLogDir != null && !baseLogDir.trim().isEmpty()) {
-
loggerConfigBuilder.property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
baseLogDir);
+ loggerType = LoggerType.FILE;
}
- loggerConfigBuilder.property(
- FileEventLogger.PRETTY_PRINT_PROPERTY_KEY,
agentPlan.getConfig().get(PRETTY_PRINT));
- loggerConfigBuilder.property(
- FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentPlan.getConfig().getConfData());
- return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
+ // The full agent config is the single source of truth for logger
settings (baseLogDir,
+ // prettyPrint, event-log levels, truncation limits). Each logger
pulls what it needs.
+ EventLoggerConfig config =
+ EventLoggerConfig.builder()
+ .loggerType(loggerType)
+ .property(
+ EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
+ agentPlan.getConfig().getConfData())
+ .build();
+ return EventLoggerFactory.createLogger(config);
}
@Override
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
index 831fba4f..9156aee7 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.configuration.AgentConfigOptions;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.logger.LoggerType;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
@@ -81,15 +82,25 @@ class FileEventLoggerTest {
when(taskInfo.getIndexOfThisSubtask()).thenReturn(testSubTaskId);
// Create config and logger
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
- .build();
+ config = buildConfig(new HashMap<>());
logger = new FileEventLogger(config);
openParams = new EventLoggerOpenParams(runtimeContext);
}
+ /**
+ * Builds an EventLoggerConfig for the file logger, seeding the
agent-config map with {@code
+ * baseLogDir} so tests can drop their per-test boilerplate and only
specify the keys they
+ * actually care about.
+ */
+ private EventLoggerConfig buildConfig(Map<String, Object>
extraAgentConfig) {
+ Map<String, Object> agentConfig = new HashMap<>(extraAgentConfig);
+ agentConfig.putIfAbsent(AgentConfigOptions.BASE_LOG_DIR.getKey(),
tempDir.toString());
+ return EventLoggerConfig.builder()
+ .loggerType(LoggerType.FILE)
+ .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
+ .build();
+ }
+
@AfterEach
void tearDown() throws Exception {
if (logger != null) {
@@ -302,12 +313,9 @@ class FileEventLoggerTest {
@Test
void testPrettyPrintOutputsFormattedJson() throws Exception {
// Given - config with prettyPrint enabled
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
- .property(AgentConfigOptions.PRETTY_PRINT.getKey(),
true)
- .build();
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put(AgentConfigOptions.PRETTY_PRINT.getKey(), true);
+ config = buildConfig(agentConfig);
logger = new FileEventLogger(config);
logger.open(openParams);
@@ -340,12 +348,7 @@ class FileEventLoggerTest {
agentConfig.put("event-log.standard.max-array-elements", 20);
agentConfig.put("event-log.standard.max-depth", 5);
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
- .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
- .build();
+ config = buildConfig(agentConfig);
logger = new FileEventLogger(config);
logger.open(openParams);
@@ -380,12 +383,7 @@ class FileEventLoggerTest {
agentConfig.put("event-log.level", "VERBOSE");
agentConfig.put("event-log.standard.max-string-length", 10);
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
- .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
- .build();
+ config = buildConfig(agentConfig);
logger = new FileEventLogger(config);
logger.open(openParams);
@@ -418,12 +416,7 @@ class FileEventLoggerTest {
Map<String, Object> agentConfig = new HashMap<>();
agentConfig.put("event-log.level", "OFF");
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
- .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
- .build();
+ config = buildConfig(agentConfig);
logger = new FileEventLogger(config);
logger.open(openParams);
@@ -446,12 +439,7 @@ class FileEventLoggerTest {
agentConfig.put("event-log.standard.max-string-length", 10);
agentConfig.put("event-log.type." + InputEvent.EVENT_TYPE + ".level",
"VERBOSE");
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
- .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
- .build();
+ config = buildConfig(agentConfig);
logger = new FileEventLogger(config);
logger.open(openParams);
@@ -533,12 +521,7 @@ class FileEventLoggerTest {
agentConfig.put("event-log.type.com.example.events.level", "OFF");
agentConfig.put("event-log.type." + TestNamespacedEventA.EVENT_TYPE +
".level", "VERBOSE");
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
- .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
- .build();
+ config = buildConfig(agentConfig);
logger = new FileEventLogger(config);
logger.open(openParams);
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLoggerTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLoggerTest.java
new file mode 100644
index 00000000..d3dc51f5
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLoggerTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.EventContext;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLoggerConfig;
+import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.agents.api.logger.LoggerType;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.AbstractAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.when;
+
+class Slf4jEventLoggerTest {
+
+ @Mock private StreamingRuntimeContext runtimeContext;
+
+ @Mock private JobInfo jobInfo;
+
+ @Mock private TaskInfo taskInfo;
+
+ private Slf4jEventLogger logger;
+ private EventLoggerOpenParams openParams;
+ private ObjectMapper objectMapper;
+ private TestAppender testAppender;
+
+ private final JobID testJobId = JobID.generate();
+ private final String testTaskName = "action-execute-operator";
+ private final int testSubTaskId = 0;
+
+ @BeforeEach
+ void setUp() {
+ MockitoAnnotations.openMocks(this);
+ objectMapper = new ObjectMapper();
+
+ // Configure mocks
+ when(runtimeContext.getJobInfo()).thenReturn(jobInfo);
+ when(runtimeContext.getTaskInfo()).thenReturn(taskInfo);
+ when(jobInfo.getJobId()).thenReturn(testJobId);
+ when(taskInfo.getTaskName()).thenReturn(testTaskName);
+ when(taskInfo.getIndexOfThisSubtask()).thenReturn(testSubTaskId);
+
+ openParams = new EventLoggerOpenParams(runtimeContext);
+
+ // Set up log4j2 test appender to capture event log output
+ testAppender = new TestAppender("TestSlf4jAppender");
+ testAppender.start();
+
+ LoggerContext loggerContext = (LoggerContext)
LogManager.getContext(false);
+ Configuration config = loggerContext.getConfiguration();
+ config.addAppender(testAppender);
+
+ LoggerConfig loggerConfig =
config.getLoggerConfig(Slf4jEventLogger.EVENT_LOGGER_NAME);
+ if
(!loggerConfig.getName().equals(Slf4jEventLogger.EVENT_LOGGER_NAME)) {
+ loggerConfig =
+ new LoggerConfig(
+ Slf4jEventLogger.EVENT_LOGGER_NAME,
+ org.apache.logging.log4j.Level.INFO,
+ false);
+ config.addLogger(Slf4jEventLogger.EVENT_LOGGER_NAME, loggerConfig);
+ }
+ loggerConfig.addAppender(testAppender,
org.apache.logging.log4j.Level.INFO, null);
+ loggerContext.updateLoggers();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (logger != null) {
+ logger.close();
+ }
+ if (testAppender != null) {
+ testAppender.stop();
+ LoggerContext loggerContext = (LoggerContext)
LogManager.getContext(false);
+ Configuration config = loggerContext.getConfiguration();
+ LoggerConfig loggerConfig =
config.getLoggerConfig(Slf4jEventLogger.EVENT_LOGGER_NAME);
+ loggerConfig.removeAppender(testAppender.getName());
+ loggerContext.updateLoggers();
+ }
+ }
+
+ @Test
+ void testAppendWritesJsonWithSubtaskContext() throws Exception {
+ EventLoggerConfig config =
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+ logger = new Slf4jEventLogger(config);
+ logger.open(openParams);
+
+ InputEvent inputEvent = new InputEvent("test input");
+ EventContext context = new EventContext(inputEvent);
+
+ logger.append(context, inputEvent);
+
+ List<String> messages = testAppender.getMessages();
+ assertEquals(1, messages.size(), "Should have logged one message");
+
+ JsonNode jsonNode = objectMapper.readTree(messages.get(0));
+ // Verify subtask context fields
+ assertEquals(testJobId.toString(), jsonNode.get("jobId").asText());
+ assertEquals(testTaskName, jsonNode.get("taskName").asText());
+ assertEquals(testSubTaskId, jsonNode.get("subtaskId").asInt());
+ // Verify event content
+ assertNotNull(jsonNode.get("timestamp"));
+ assertNotNull(jsonNode.get("event"));
+ assertEquals(InputEvent.EVENT_TYPE,
jsonNode.get("eventType").asText());
+ }
+
+ @Test
+ void testAppendMultipleEvents() throws Exception {
+ EventLoggerConfig config =
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+ logger = new Slf4jEventLogger(config);
+ logger.open(openParams);
+
+ InputEvent inputEvent = new InputEvent("input data");
+ OutputEvent outputEvent = new OutputEvent("output data");
+
+ logger.append(new EventContext(inputEvent), inputEvent);
+ logger.append(new EventContext(outputEvent), outputEvent);
+
+ List<String> messages = testAppender.getMessages();
+ assertEquals(2, messages.size(), "Should have logged two messages");
+
+ JsonNode inputJson = objectMapper.readTree(messages.get(0));
+ assertEquals("input data",
inputJson.get("event").get("attributes").get("input").asText());
+
+ JsonNode outputJson = objectMapper.readTree(messages.get(1));
+ assertEquals(
+ "output data",
outputJson.get("event").get("attributes").get("output").asText());
+ }
+
+ @Test
+ void testRootLevelOffSkipsAllEvents() throws Exception {
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put(AgentConfigOptions.EVENT_LOG_LEVEL.getKey(), "OFF");
+ EventLoggerConfig config =
+ EventLoggerConfig.builder()
+ .loggerType(LoggerType.SLF4J)
+ .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
+ .build();
+ logger = new Slf4jEventLogger(config);
+ logger.open(openParams);
+
+ InputEvent inputEvent = new InputEvent("input data");
+ logger.append(new EventContext(inputEvent), inputEvent);
+
+ List<String> messages = testAppender.getMessages();
+ assertTrue(messages.isEmpty(), "No events should be logged when root
level is OFF");
+ }
+
+ @Test
+ void testPerEventTypeOffSkipsMatchingEvents() throws Exception {
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put("event-log.type." + InputEvent.EVENT_TYPE + ".level",
"OFF");
+ EventLoggerConfig config =
+ EventLoggerConfig.builder()
+ .loggerType(LoggerType.SLF4J)
+ .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
+ .build();
+ logger = new Slf4jEventLogger(config);
+ logger.open(openParams);
+
+ InputEvent inputEvent = new InputEvent("input data");
+ OutputEvent outputEvent = new OutputEvent("output data");
+
+ logger.append(new EventContext(inputEvent), inputEvent);
+ logger.append(new EventContext(outputEvent), outputEvent);
+
+ List<String> messages = testAppender.getMessages();
+ assertEquals(
+ 1, messages.size(), "Only OutputEvent should be logged when
InputEvent is OFF");
+
+ JsonNode jsonNode = objectMapper.readTree(messages.get(0));
+ assertEquals("output data",
jsonNode.get("event").get("attributes").get("output").asText());
+ }
+
+ @Test
+ void testLogLevelAppearsInJson() throws Exception {
+ EventLoggerConfig config =
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+ logger = new Slf4jEventLogger(config);
+ logger.open(openParams);
+
+ InputEvent inputEvent = new InputEvent("test input");
+ logger.append(new EventContext(inputEvent), inputEvent);
+
+ List<String> messages = testAppender.getMessages();
+ assertEquals(1, messages.size());
+ JsonNode jsonNode = objectMapper.readTree(messages.get(0));
+ assertNotNull(jsonNode.get("logLevel"), "logLevel field should be
present in JSON output");
+ }
+
+ @Test
+ void testDefaultIsNotPrettyPrinted() throws Exception {
+ // Default config should produce single-line JSON, matching
AgentConfigOptions.PRETTY_PRINT
+ // default (false).
+ EventLoggerConfig config =
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+ logger = new Slf4jEventLogger(config);
+ logger.open(openParams);
+
+ InputEvent inputEvent = new InputEvent("test input");
+ logger.append(new EventContext(inputEvent), inputEvent);
+
+ List<String> messages = testAppender.getMessages();
+ assertEquals(1, messages.size());
+
+ String json = messages.get(0);
+ assertFalse(json.contains("\n"), "Default output should be single
line");
+ assertDoesNotThrow(() -> objectMapper.readTree(json), "Output should
be valid JSON");
+ }
+
+ @Test
+ void testEnablePrettyPrint() throws Exception {
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put(AgentConfigOptions.PRETTY_PRINT.getKey(), true);
+ EventLoggerConfig config =
+ EventLoggerConfig.builder()
+ .loggerType(LoggerType.SLF4J)
+ .property(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
+ .build();
+ logger = new Slf4jEventLogger(config);
+ logger.open(openParams);
+
+ InputEvent inputEvent = new InputEvent("test input");
+ logger.append(new EventContext(inputEvent), inputEvent);
+
+ List<String> messages = testAppender.getMessages();
+ assertEquals(1, messages.size());
+
+ String json = messages.get(0);
+ assertTrue(json.contains("\n"), "Pretty-printed output should span
multiple lines");
+ assertDoesNotThrow(
+ () -> objectMapper.readTree(json), "Pretty-printed output
should be valid JSON");
+ }
+
+ @Test
+ void testFlushAndCloseAreNoOps() throws Exception {
+ EventLoggerConfig config =
EventLoggerConfig.builder().loggerType(LoggerType.SLF4J).build();
+ logger = new Slf4jEventLogger(config);
+ logger.open(openParams);
+
+ assertDoesNotThrow(() -> logger.flush(), "flush() should not throw");
+ assertDoesNotThrow(() -> logger.close(), "close() should not throw");
+ }
+
+ /** A log4j2 appender that captures log messages for testing. */
+ private static class TestAppender extends AbstractAppender {
+
+ private final List<String> messages = Collections.synchronizedList(new
ArrayList<>());
+
+ protected TestAppender(String name) {
+ super(name, null,
PatternLayout.newBuilder().withPattern("%msg").build(), true, null);
+ }
+
+ @Override
+ public void append(LogEvent event) {
+ messages.add(event.getMessage().getFormattedMessage());
+ }
+
+ public List<String> getMessages() {
+ return messages;
+ }
+ }
+}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index 45c46f14..2b2f5a29 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -25,6 +25,8 @@ import
org.apache.flink.agents.api.configuration.AgentConfigOptions;
import org.apache.flink.agents.api.context.DurableCallable;
import org.apache.flink.agents.api.context.MemoryObject;
import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.logger.EventLoggerConfig;
+import org.apache.flink.agents.api.logger.LoggerType;
import org.apache.flink.agents.plan.AgentConfiguration;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.JavaFunction;
@@ -366,6 +368,7 @@ public class ActionExecutionOperatorTest {
void testEventLogBaseDirFromAgentConfig() throws Exception {
String baseLogDir = "/tmp/flink-agents-test";
AgentConfiguration config = new AgentConfiguration();
+ config.set(AgentConfigOptions.EVENT_LOGGER_TYPE, LoggerType.FILE);
config.set(AgentConfigOptions.BASE_LOG_DIR, baseLogDir);
config.set(AgentConfigOptions.PRETTY_PRINT, true);
AgentPlan agentPlan = TestAgent.getAgentPlanWithConfig(config);
@@ -389,9 +392,13 @@ public class ActionExecutionOperatorTest {
@SuppressWarnings("unchecked")
Map<String, Object> properties =
(Map<String, Object>) propertiesField.get(loggerConfig);
-
assertThat(properties.get(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY))
+ @SuppressWarnings("unchecked")
+ Map<String, Object> agentConfig =
+ (Map<String, Object>)
+
properties.get(EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY);
+
assertThat(agentConfig.get(AgentConfigOptions.BASE_LOG_DIR.getKey()))
.isEqualTo(baseLogDir);
-
assertThat(properties.get(FileEventLogger.PRETTY_PRINT_PROPERTY_KEY)).isEqualTo(true);
+
assertThat(agentConfig.get(AgentConfigOptions.PRETTY_PRINT.getKey())).isEqualTo(true);
}
}