This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new c72f8f6f [runtime] Support per-event-type configurable log levels for 
event log (#609)
c72f8f6f is described below

commit c72f8f6f03d1e7891bb6af2d692a4f8f163bb595
Author: Weiqing Yang <[email protected]>
AuthorDate: Mon May 11 19:13:50 2026 -0700

    [runtime] Support per-event-type configurable log levels for event log 
(#609)
---
 .../org/apache/flink/agents/api/EventFilter.java   |  61 ----
 .../api/configuration/AgentConfigOptions.java      |  30 ++
 .../flink/agents/api/logger/EventLogLevel.java     |  61 ++++
 .../flink/agents/api/logger/EventLoggerConfig.java |  36 +--
 python/flink_agents/api/core_options.py            |  25 ++
 .../python_event_logging_test.py                   | 124 ++++++++
 .../runtime/eventlog/EventLogLevelResolver.java    | 140 +++++++++
 .../agents/runtime/eventlog/EventLogRecord.java    |   2 +
 .../eventlog/EventLogRecordJsonDeserializer.java   |   3 +-
 .../eventlog/EventLogRecordJsonSerializer.java     |   3 +
 .../agents/runtime/eventlog/FileEventLogger.java   | 110 ++++++-
 .../agents/runtime/eventlog/JsonTruncator.java     | 285 ++++++++++++++++++
 .../agents/runtime/metrics/BuiltInMetrics.java     |   9 +
 .../flink/agents/runtime/operator/EventRouter.java |   6 +
 .../eventlog/EventLogLevelResolverTest.java        | 153 ++++++++++
 .../runtime/eventlog/FileEventLoggerTest.java      | 318 ++++++++++++---------
 .../agents/runtime/eventlog/JsonTruncatorTest.java | 251 ++++++++++++++++
 17 files changed, 1376 insertions(+), 241 deletions(-)

diff --git a/api/src/main/java/org/apache/flink/agents/api/EventFilter.java 
b/api/src/main/java/org/apache/flink/agents/api/EventFilter.java
deleted file mode 100644
index ce97cc59..00000000
--- a/api/src/main/java/org/apache/flink/agents/api/EventFilter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.agents.api;
-
-/**
- * Interface for filtering events in event logging and listening.
- *
- * <p>EventFilter allows fine-grained control over which events are processed 
by event logs and
- * event listeners. Implementations can filter based on event type, 
attributes, or custom logic.
- */
-@FunctionalInterface
-public interface EventFilter {
-    /**
-     * Determines whether an event should be processed.
-     *
-     * @param event The event to evaluate
-     * @param context The context associated with the event
-     * @return true if the event should be processed, false otherwise
-     */
-    boolean accept(Event event, EventContext context);
-
-    /**
-     * Creates a filter that accepts events of the specified types.
-     *
-     * @param eventTypes The event types to accept
-     * @return An EventFilter that accepts only the specified event types
-     */
-    @SafeVarargs
-    static EventFilter byEventType(Class<? extends Event>... eventTypes) {
-        return (event, context) -> {
-            for (Class<? extends Event> eventType : eventTypes) {
-                if (eventType.isInstance(event)) {
-                    return true;
-                }
-            }
-            return false;
-        };
-    }
-
-    /** A filter that accepts all events. */
-    EventFilter ACCEPT_ALL = (event, context) -> true;
-
-    /** A filter that rejects all events. */
-    EventFilter REJECT_ALL = (event, context) -> false;
-}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 72424745..9cdda8bd 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.agents.api.configuration;
 
+import org.apache.flink.agents.api.logger.EventLogLevel;
+
 /** The set of configuration options for agents parameters. */
 public class AgentConfigOptions {
 
@@ -54,4 +56,32 @@ public class AgentConfigOptions {
     /** The config parameter specifies the unique identifier of job. */
     public static final ConfigOption<String> JOB_IDENTIFIER =
             new ConfigOption<>("job-identifier", String.class, null);
+
+    /**
+     * The global event log level controlling the default verbosity for all 
event types. Valid
+     * values are "OFF", "STANDARD", and "VERBOSE". Defaults to "STANDARD".
+     */
+    public static final ConfigOption<EventLogLevel> EVENT_LOG_LEVEL =
+            new ConfigOption<>("event-log.level", EventLogLevel.class, 
EventLogLevel.STANDARD);
+
+    /**
+     * The maximum string length for event payloads when logging at STANDARD 
level. Strings
+     * exceeding this length will be truncated. Defaults to 2000.
+     */
+    public static final ConfigOption<Integer> EVENT_LOG_MAX_STRING_LENGTH =
+            new ConfigOption<>("event-log.standard.max-string-length", 
Integer.class, 2000);
+
+    /**
+     * The maximum number of array elements to include in event payloads when 
logging at STANDARD
+     * level. Arrays exceeding this size will be truncated. Defaults to 20.
+     */
+    public static final ConfigOption<Integer> EVENT_LOG_MAX_ARRAY_ELEMENTS =
+            new ConfigOption<>("event-log.standard.max-array-elements", 
Integer.class, 20);
+
+    /**
+     * The maximum nesting depth for event payloads when logging at STANDARD 
level. Objects deeper
+     * than this level will be summarized. Defaults to 5.
+     */
+    public static final ConfigOption<Integer> EVENT_LOG_MAX_DEPTH =
+            new ConfigOption<>("event-log.standard.max-depth", Integer.class, 
5);
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java 
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java
new file mode 100644
index 00000000..118e265c
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.logger;
+
+/**
+ * Log level for event logging, controlling the verbosity of event log output.
+ *
+ * <ul>
+ *   <li>{@link #OFF} - No events of this type are logged.
+ *   <li>{@link #STANDARD} - Events are logged with truncated/summarized 
payloads (default).
+ *   <li>{@link #VERBOSE} - Events are logged with full, untruncated payloads.
+ * </ul>
+ */
+public enum EventLogLevel {
+
+    /** No events of this type are logged. */
+    OFF,
+
+    /** Events are logged with truncated/summarized payloads. This is the 
default level. */
+    STANDARD,
+
+    /** Events are logged with full, untruncated payloads. */
+    VERBOSE;
+
+    /**
+     * Parses a string value into an {@link EventLogLevel}, case-insensitively.
+     *
+     * @param value the string representation of the log level (e.g., "off", 
"STANDARD", "Verbose")
+     * @return the corresponding {@link EventLogLevel}
+     * @throws IllegalArgumentException if the value does not match any log 
level
+     */
+    public static EventLogLevel fromString(String value) {
+        if (value == null) {
+            throw new IllegalArgumentException("EventLogLevel value cannot be 
null");
+        }
+        try {
+            return valueOf(value.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                    "Invalid EventLogLevel: '"
+                            + value
+                            + "'. Valid values are: OFF, STANDARD, VERBOSE");
+        }
+    }
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java 
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
index f57adef6..1dedeef0 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.agents.api.logger;
 
-import org.apache.flink.agents.api.EventFilter;
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -45,14 +43,11 @@ import java.util.Objects;
 public final class EventLoggerConfig {
 
     private final String loggerType;
-    private final EventFilter eventFilter;
     private final Map<String, Object> properties;
 
     /** Private constructor - use {@link #builder()} to create instances. */
-    private EventLoggerConfig(
-            String loggerType, EventFilter eventFilter, Map<String, Object> 
properties) {
+    private EventLoggerConfig(String loggerType, Map<String, Object> 
properties) {
         this.loggerType = Objects.requireNonNull(loggerType, "Logger type 
cannot be null");
-        this.eventFilter = eventFilter == null ? EventFilter.ACCEPT_ALL : 
eventFilter;
         this.properties = Collections.unmodifiableMap(new 
HashMap<>(properties));
     }
 
@@ -81,15 +76,6 @@ public final class EventLoggerConfig {
         return loggerType;
     }
 
-    /**
-     * Gets the event filter for this logger configuration.
-     *
-     * @return the EventFilter to apply, never null
-     */
-    public EventFilter getEventFilter() {
-        return eventFilter;
-    }
-
     /**
      * Gets the implementation-specific properties for this logger 
configuration.
      *
@@ -113,13 +99,12 @@ public final class EventLoggerConfig {
         if (o == null || getClass() != o.getClass()) return false;
         EventLoggerConfig that = (EventLoggerConfig) o;
         return Objects.equals(loggerType, that.loggerType)
-                && Objects.equals(eventFilter, that.eventFilter)
                 && Objects.equals(properties, that.properties);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(loggerType, eventFilter, properties);
+        return Objects.hash(loggerType, properties);
     }
 
     @Override
@@ -128,8 +113,6 @@ public final class EventLoggerConfig {
                 + "loggerType='"
                 + loggerType
                 + '\''
-                + ", eventFilter="
-                + eventFilter
                 + ", properties="
                 + properties
                 + '}';
@@ -143,7 +126,6 @@ public final class EventLoggerConfig {
      */
     public static final class Builder {
         private String loggerType = "file"; // Default to file logger
-        private EventFilter eventFilter = EventFilter.ACCEPT_ALL; // Default 
to accept all
         private final Map<String, Object> properties = new HashMap<>();
 
         private Builder() {}
@@ -163,18 +145,6 @@ public final class EventLoggerConfig {
             return this;
         }
 
-        /**
-         * Sets the event filter for this configuration.
-         *
-         * @param eventFilter the EventFilter to apply
-         * @return this Builder instance for method chaining
-         * @throws IllegalArgumentException if eventFilter is null
-         */
-        public Builder eventFilter(EventFilter eventFilter) {
-            this.eventFilter = Objects.requireNonNull(eventFilter, "Event 
filter cannot be null");
-            return this;
-        }
-
         /**
          * Adds a property to the configuration.
          *
@@ -213,7 +183,7 @@ public final class EventLoggerConfig {
          * @return a new EventLoggerConfig instance
          */
         public EventLoggerConfig build() {
-            return new EventLoggerConfig(loggerType, eventFilter, properties);
+            return new EventLoggerConfig(loggerType, properties);
         }
     }
 }
diff --git a/python/flink_agents/api/core_options.py 
b/python/flink_agents/api/core_options.py
index 9245e78c..98b8b317 100644
--- a/python/flink_agents/api/core_options.py
+++ b/python/flink_agents/api/core_options.py
@@ -91,6 +91,31 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
         default=None,
     )
 
+    # Event log level config options
+    EVENT_LOG_LEVEL = ConfigOption(
+        key="event-log.level",
+        config_type=str,
+        default="STANDARD",
+    )
+
+    EVENT_LOG_MAX_STRING_LENGTH = ConfigOption(
+        key="event-log.standard.max-string-length",
+        config_type=int,
+        default=2000,
+    )
+
+    EVENT_LOG_MAX_ARRAY_ELEMENTS = ConfigOption(
+        key="event-log.standard.max-array-elements",
+        config_type=int,
+        default=20,
+    )
+
+    EVENT_LOG_MAX_DEPTH = ConfigOption(
+        key="event-log.standard.max-depth",
+        config_type=int,
+        default=5,
+    )
+
 
 class AgentExecutionOptions:
     """Execution options for Flink Agents."""
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
 
b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
index 7ea4f3d4..89807927 100644
--- 
a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
+++ 
b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
@@ -126,6 +126,8 @@ def test_python_event_logging(tmp_path: Path) -> None:
     assert record is not None, "Event log file is empty."
     assert record_line is not None, "Event log file is empty."
     assert "timestamp" in record
+    assert "logLevel" in record
+    assert "eventType" in record
     assert "event" in record
     assert "eventType" in record["event"]
     assert has_processed_review, "Log should contain processed review content"
@@ -137,3 +139,125 @@ def test_python_event_logging(tmp_path: Path) -> None:
     assert id_idx != -1
     assert attributes_idx != -1
     assert event_type_idx < id_idx < attributes_idx
+
+
+def _run_event_logging_pipeline(
+    tmp_path: Path, config_overrides: dict[str, str] | None = None
+) -> Path:
+    """Run the event logging pipeline and return the event log directory.
+
+    Args:
+        tmp_path: Temporary directory for log output.
+        config_overrides: Optional dict of config key-value pairs to set.
+
+    Returns:
+        The event log directory path.
+    """
+    event_log_dir = tmp_path / "event_log"
+    default_log_dir = Path(tempfile.gettempdir()) / "flink-agents"
+    shutil.rmtree(default_log_dir, ignore_errors=True)
+
+    config = Configuration()
+    env = StreamExecutionEnvironment.get_execution_environment(config)
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    agents_env.get_config().set_str("baseLogDir", str(event_log_dir))
+
+    if config_overrides:
+        for key, value in config_overrides.items():
+            agents_env.get_config().set_str(key, value)
+
+    current_dir = Path(__file__).parent
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(),
+            f"file:///{current_dir}/../resources/input/input_data.txt",
+        ).build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="python_event_logging_test",
+    )
+
+    deserialize_datastream = input_datastream.map(lambda x: json.loads(x))
+
+    agents_env.from_datastream(
+        input=deserialize_datastream, key_selector=InputKeySelector()
+    ).apply(PythonEventLoggingAgent()).to_datastream()
+
+    agents_env.execute()
+    return event_log_dir
+
+
+def _read_log_records(event_log_dir: Path) -> list[dict]:
+    """Read all JSON records from event log files.
+
+    Args:
+        event_log_dir: Directory containing event log files.
+
+    Returns:
+        List of parsed JSON records.
+    """
+    records: list[dict] = []
+    for log_file in event_log_dir.glob("events-*.log"):
+        with log_file.open(encoding="utf-8") as handle:
+            records.extend(json.loads(line) for line in handle if line.strip())
+    return records
+
+
+def test_event_log_verbose_level(tmp_path: Path) -> None:
+    """Test that VERBOSE log level writes events without truncation."""
+    event_log_dir = _run_event_logging_pipeline(
+        tmp_path, config_overrides={"event-log.level": "VERBOSE"}
+    )
+
+    records = _read_log_records(event_log_dir)
+    assert len(records) > 0, "VERBOSE level should produce event log records"
+
+    for record in records:
+        assert record.get("logLevel") == "VERBOSE", (
+            f"Expected logLevel VERBOSE, got {record.get('logLevel')}"
+        )
+
+    # VERBOSE should not truncate content (no truncatedString wrappers)
+    raw_content = json.dumps(records)
+    assert "truncatedString" not in raw_content, (
+        "VERBOSE level should not truncate any content"
+    )
+
+
+def test_event_log_off_level(tmp_path: Path) -> None:
+    """Test that OFF log level suppresses all event logging."""
+    event_log_dir = _run_event_logging_pipeline(
+        tmp_path, config_overrides={"event-log.level": "OFF"}
+    )
+
+    records = _read_log_records(event_log_dir)
+    assert len(records) == 0, (
+        f"OFF level should not produce any event log records, but found 
{len(records)}"
+    )
+
+
+def test_event_log_standard_truncation(tmp_path: Path) -> None:
+    """Test that STANDARD level truncates strings exceeding 
max-string-length."""
+    event_log_dir = _run_event_logging_pipeline(
+        tmp_path,
+        config_overrides={
+            "event-log.level": "STANDARD",
+            "event-log.standard.max-string-length": "10",
+        },
+    )
+
+    records = _read_log_records(event_log_dir)
+    assert len(records) > 0, "STANDARD level should produce event log records"
+
+    for record in records:
+        assert record.get("logLevel") == "STANDARD", (
+            f"Expected logLevel STANDARD, got {record.get('logLevel')}"
+        )
+
+    # With max-string-length=10, any string longer than 10 chars should be 
truncated
+    raw_content = json.dumps(records)
+    assert "truncatedString" in raw_content, (
+        "STANDARD level with max-string-length=10 should truncate long strings"
+    )
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java
new file mode 100644
index 00000000..f2f4b502
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Resolves the effective {@link EventLogLevel} for a given event type string 
using hierarchical
+ * config key inheritance.
+ *
+ * <p>Resolution order for a given event type (e.g., {@code 
org.apache.flink.agents.api.event
+ * .ChatRequestEvent}):
+ *
+ * <ol>
+ *   <li>Exact match in explicit per-type configuration
+ *   <li>Walk up dot-separated segments (e.g., {@code 
org.apache.flink.agents.api.event}, then
+ *       {@code org.apache.flink.agents.api}, etc.)
+ *   <li>Root default from {@code event-log.level} config key
+ *   <li>Built-in default: {@link EventLogLevel#STANDARD}
+ * </ol>
+ *
+ * <p>This mirrors Log4j's hierarchical logger configuration pattern. Resolved 
levels are cached in
+ * a {@link ConcurrentHashMap} for efficient repeated lookups.
+ *
+ * <p>Config keys are expected in the form:
+ *
+ * <pre>
+ *   event-log.level = STANDARD            (root default)
+ *   event-log.type.&lt;EVENT_TYPE&gt;.level = OFF   (per-type override)
+ * </pre>
+ */
+public class EventLogLevelResolver {
+
+    /** Prefix for per-event-type log level config keys. */
+    static final String TYPE_PREFIX = "event-log.type.";
+
+    /** Suffix for per-event-type log level config keys. */
+    static final String TYPE_SUFFIX = ".level";
+
+    private final EventLogLevel rootDefault;
+    private final Map<String, EventLogLevel> explicitLevels;
+    private final ConcurrentHashMap<String, EventLogLevel> cache;
+
+    /**
+     * Creates a resolver from a configuration data map.
+     *
+     * <p>The map is scanned for keys matching {@code 
event-log.type.<EVENT_TYPE>.level} to build
+     * the explicit per-type level mappings, and {@code event-log.level} for 
the root default.
+     *
+     * @param confData the flat configuration key-value map (e.g., from {@code
+     *     AgentConfiguration.getConfData()})
+     */
+    public EventLogLevelResolver(Map<String, Object> confData) {
+        Map<String, Object> data = confData != null ? confData : 
Collections.emptyMap();
+
+        // Parse root default
+        Object rootValue = 
data.get(AgentConfigOptions.EVENT_LOG_LEVEL.getKey());
+        this.rootDefault =
+                rootValue != null
+                        ? EventLogLevel.fromString(rootValue.toString())
+                        : AgentConfigOptions.EVENT_LOG_LEVEL.getDefaultValue();
+
+        // Scan for per-type overrides
+        Map<String, EventLogLevel> levels = new HashMap<>();
+        for (Map.Entry<String, Object> entry : data.entrySet()) {
+            String key = entry.getKey();
+            if (key.startsWith(TYPE_PREFIX) && key.endsWith(TYPE_SUFFIX)) {
+                String eventType =
+                        key.substring(TYPE_PREFIX.length(), key.length() - 
TYPE_SUFFIX.length());
+                if (!eventType.isEmpty()) {
+                    levels.put(eventType, 
EventLogLevel.fromString(entry.getValue().toString()));
+                }
+            }
+        }
+        this.explicitLevels = Collections.unmodifiableMap(levels);
+        this.cache = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Resolves the effective log level for the given event type string.
+     *
+     * <p>Uses hierarchical inheritance: exact match first, then walks up 
dot-separated segments,
+     * then falls back to root default, then built-in default ({@link 
EventLogLevel#STANDARD}).
+     *
+     * @param eventType the fully qualified event type string (e.g., {@code
+     *     org.apache.flink.agents.api.event.ChatRequestEvent})
+     * @return the resolved {@link EventLogLevel}, never null
+     */
+    public EventLogLevel resolve(String eventType) {
+        if (eventType == null || eventType.isEmpty()) {
+            return rootDefault;
+        }
+        return cache.computeIfAbsent(eventType, this::doResolve);
+    }
+
+    private EventLogLevel doResolve(String eventType) {
+        // 1. Exact match
+        EventLogLevel level = explicitLevels.get(eventType);
+        if (level != null) {
+            return level;
+        }
+
+        // 2. Walk up dot-separated segments
+        String current = eventType;
+        int lastDot = current.lastIndexOf('.');
+        while (lastDot > 0) {
+            current = current.substring(0, lastDot);
+            level = explicitLevels.get(current);
+            if (level != null) {
+                return level;
+            }
+            lastDot = current.lastIndexOf('.');
+        }
+
+        // 3. Root default (already parsed in constructor, falls through to 
built-in if not set)
+        return rootDefault;
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
index 5b83452d..402c3f52 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
@@ -43,10 +43,12 @@ public class EventLogRecord {
         this.event = event;
     }
 
+    /** Returns the event context. */
     public EventContext getContext() {
         return context;
     }
 
+    /** Returns the event. */
     public Event getEvent() {
         return event;
     }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
index bfb2f499..ac1e6943 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
@@ -57,7 +57,8 @@ public class EventLogRecordJsonDeserializer extends 
JsonDeserializer<EventLogRec
             throw new IOException("Missing 'timestamp' field in EventLogRecord 
JSON");
         }
 
-        // Deserialize event as base Event
+        // Deserialize event as base Event. Any top-level "logLevel" field 
present in older log
+        // files is silently ignored — it is no longer part of the record.
         JsonNode eventNode = rootNode.get("event");
         if (eventNode == null) {
             throw new IOException("Missing 'event' field in EventLogRecord 
JSON");
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
index e453aa6e..0a035603 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
@@ -38,6 +38,7 @@ import java.util.Map;
  *
  * <ul>
  *   <li>Top-level timestamp
+ *   <li>Top-level eventType (routing key, mirrors {@code event.eventType})
  *   <li>Event data serialized as a standard JSON object
  * </ul>
  *
@@ -46,6 +47,7 @@ import java.util.Map;
  * <pre>{@code
  * {
  *   "timestamp": "2024-01-15T10:30:00Z",
+ *   "eventType": "_input_event",
  *   "event": {
  *     "eventType": "_input_event",
  *     // Event fields serialized normally
@@ -66,6 +68,7 @@ public class EventLogRecordJsonSerializer extends 
JsonSerializer<EventLogRecord>
 
         gen.writeStartObject();
         gen.writeStringField("timestamp", record.getContext().getTimestamp());
+        gen.writeStringField("eventType", record.getEvent().getType());
 
         gen.writeFieldName("event");
         JsonNode eventNode = buildEventNode(record.getEvent(), mapper);
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
index 8e781727..7b42eb7a 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
@@ -18,13 +18,17 @@
 
 package org.apache.flink.agents.runtime.eventlog;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.EventContext;
-import org.apache.flink.agents.api.EventFilter;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
 import org.apache.flink.agents.api.logger.EventLogger;
 import org.apache.flink.agents.api.logger.EventLoggerConfig;
 import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.metrics.Counter;
 
 import java.io.BufferedWriter;
 import java.io.FileWriter;
@@ -32,6 +36,8 @@ import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
 
 /**
  * A file-based event logger that logs events to files with structured names 
in a flat directory.
@@ -81,16 +87,20 @@ public class FileEventLogger implements EventLogger {
     private static final String DEFAULT_BASE_LOG_DIR =
             Paths.get(System.getProperty("java.io.tmpdir"), 
"flink-agents").toString();
 
+    /** Property key for passing the full agent config data map into the 
logger. */
+    public static final String AGENT_CONFIG_PROPERTY_KEY = "agentConfig";
+
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
     private final EventLoggerConfig config;
-    private final EventFilter eventFilter;
     private boolean prettyPrint;
     private PrintWriter writer;
+    private EventLogLevelResolver levelResolver;
+    private JsonTruncator truncator;
+    private Counter truncatedEventsCounter;
 
     public FileEventLogger(EventLoggerConfig config) {
         this.config = config;
-        this.eventFilter = config.getEventFilter();
     }
 
     @Override
@@ -105,6 +115,45 @@ public class FileEventLogger implements EventLogger {
         writer = new PrintWriter(new BufferedWriter(new 
FileWriter(logFilePath, true)));
         prettyPrint =
                 (Boolean) 
config.getProperties().getOrDefault(PRETTY_PRINT_PROPERTY_KEY, false);
+
+        // Initialize level resolver and truncator from agent config
+        @SuppressWarnings("unchecked")
+        Map<String, Object> agentConfig =
+                (Map<String, Object>)
+                        config.getProperties()
+                                .getOrDefault(AGENT_CONFIG_PROPERTY_KEY, 
Collections.emptyMap());
+        this.levelResolver = new EventLogLevelResolver(agentConfig);
+        int maxStringLength =
+                getIntFromConfig(
+                        agentConfig,
+                        
AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getKey(),
+                        
AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getDefaultValue());
+        int maxArrayElements =
+                getIntFromConfig(
+                        agentConfig,
+                        
AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getKey(),
+                        
AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getDefaultValue());
+        int maxDepth =
+                getIntFromConfig(
+                        agentConfig,
+                        AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getKey(),
+                        
AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getDefaultValue());
+        this.truncator = new JsonTruncator(maxStringLength, maxArrayElements, 
maxDepth);
+    }
+
+    private static int getIntFromConfig(Map<String, Object> config, String 
key, int defaultValue) {
+        Object value = config.get(key);
+        if (value == null) {
+            return defaultValue;
+        }
+        if (value instanceof Number) {
+            return ((Number) value).intValue();
+        }
+        try {
+            return Integer.parseInt(value.toString());
+        } catch (NumberFormatException e) {
+            return defaultValue;
+        }
     }
 
     private String generateSubTaskLogFilePath(EventLoggerOpenParams params) {
@@ -130,18 +179,49 @@ public class FileEventLogger implements EventLogger {
             throw new IllegalStateException("FileEventLogger not initialized. 
Call open() first.");
         }
 
-        // Apply event filter
-        if (!eventFilter.accept(event, context)) {
-            return; // Skip this event
+        // Resolve log level and skip OFF events.
+        EventLogLevel level =
+                levelResolver != null
+                        ? levelResolver.resolve(event.getType())
+                        : EventLogLevel.VERBOSE;
+        if (level == EventLogLevel.OFF) {
+            return;
         }
 
+        // All events should be JSON serializable; we already check this when 
sending events
+        // to context (RunnerContextImpl.sendEvent).
         EventLogRecord record = new EventLogRecord(context, event);
-        // All events should be JSON serializable, since we check it when 
sending events to context:
-        // RunnerContextImpl.sendEvent
+        JsonNode tree = MAPPER.valueToTree(record);
+        if (!(tree instanceof ObjectNode)) {
+            throw new IllegalStateException(
+                    "EventLogRecord must serialize to a JSON object, but was: "
+                            + tree.getNodeType());
+        }
+        ObjectNode rootNode = (ObjectNode) tree;
+
+        // Truncate the event subtree at STANDARD level.
+        if (level == EventLogLevel.STANDARD && truncator != null) {
+            JsonNode eventNode = rootNode.get("event");
+            if (eventNode instanceof ObjectNode) {
+                boolean truncated = truncator.truncate((ObjectNode) eventNode);
+                if (truncated && truncatedEventsCounter != null) {
+                    truncatedEventsCounter.inc();
+                }
+            }
+        }
+
+        // Rebuild the top-level object so logLevel sits between timestamp and 
eventType,
+        // matching the documented JSON layout.
+        ObjectNode ordered = MAPPER.createObjectNode();
+        ordered.set("timestamp", rootNode.get("timestamp"));
+        ordered.put("logLevel", level.name());
+        ordered.set("eventType", rootNode.get("eventType"));
+        ordered.set("event", rootNode.get("event"));
+
         String json =
                 prettyPrint
-                        ? 
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(record)
-                        : MAPPER.writeValueAsString(record);
+                        ? 
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(ordered)
+                        : MAPPER.writeValueAsString(ordered);
         writer.println(json);
     }
 
@@ -154,6 +234,16 @@ public class FileEventLogger implements EventLogger {
         writer.flush();
     }
 
+    /**
+     * Sets the counter for tracking truncated events. Called by the operator 
after metrics are
+     * initialized.
+     *
+     * @param counter the counter to increment when events are truncated
+     */
+    public void setTruncatedEventsCounter(Counter counter) {
+        this.truncatedEventsCounter = counter;
+    }
+
     @Override
     public void close() throws Exception {
         if (writer != null) {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java
new file mode 100644
index 00000000..89e49cee
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Truncates a Jackson {@link JsonNode} tree per configurable thresholds.
+ *
+ * <p>Three truncation strategies are applied in a single recursive pass:
+ *
+ * <ul>
+ *   <li><b>String truncation</b>: Strings longer than {@code maxStringLength} 
are replaced with a
+ *       wrapper: {@code {"truncatedString": "first N chars...", 
"omittedChars": M}}
+ *   <li><b>Array truncation</b>: Arrays larger than {@code maxArrayElements} 
are replaced with a
+ *       wrapper: {@code {"truncatedList": [first N elements], 
"omittedElements": M}}
+ *   <li><b>Depth truncation</b>: At max depth, object nodes retain only 
scalar fields; nested
+ *       objects/arrays are dropped: {@code {"truncatedObject": {scalars...}, 
"omittedFields": N}}
+ * </ul>
+ *
+ * <p>Setting any threshold to {@code 0} disables that specific truncation 
strategy. If all
+ * thresholds are {@code 0}, no truncation occurs.
+ *
+ * <p>Protected fields at the top level of the event node ({@code eventType}, 
{@code id}, {@code
+ * attributes}) are never truncated as structural fields. The {@code 
attributes} envelope is
+ * additionally traversed so user payload stored inside it is truncated like 
any other field.
+ */
+public class JsonTruncator {
+
+    private static final Set<String> PROTECTED_FIELDS =
+            new HashSet<>(Arrays.asList("eventType", "id", "attributes"));
+
+    private final int maxStringLength;
+    private final int maxArrayElements;
+    private final int maxDepth;
+
+    /**
+     * Creates a new truncator with the given thresholds.
+     *
+     * @param maxStringLength maximum character length for string values; 0 to 
disable
+     * @param maxArrayElements maximum number of array elements retained; 0 to 
disable
+     * @param maxDepth maximum object nesting depth; 0 to disable
+     */
+    public JsonTruncator(int maxStringLength, int maxArrayElements, int 
maxDepth) {
+        this.maxStringLength = maxStringLength;
+        this.maxArrayElements = maxArrayElements;
+        this.maxDepth = maxDepth;
+    }
+
+    /**
+     * Truncates the given event node in place according to configured 
thresholds.
+     *
+     * <p>Protected fields ({@code eventType}, {@code id}, {@code attributes}) 
at the top level of
+     * the event node are never truncated as structural fields. The {@code 
attributes} envelope is
+     * additionally traversed so user payload stored inside it is truncated 
like any other field.
+     *
+     * @param eventNode the top-level event JSON object to truncate
+     * @return {@code true} if any field was truncated, {@code false} if the 
node was unchanged
+     */
+    public boolean truncate(ObjectNode eventNode) {
+        if (eventNode == null) {
+            return false;
+        }
+        boolean truncated = truncateObject(eventNode, 1, true);
+        // Traverse the protected attributes envelope so its user payload 
still gets truncated.
+        JsonNode attributes = eventNode.get("attributes");
+        if (attributes instanceof ObjectNode) {
+            truncated |= truncateObject((ObjectNode) attributes, 1, false);
+        }
+        return truncated;
+    }
+
+    /**
+     * Recursively truncates an object node.
+     *
+     * @param node the object node to process
+     * @param depth current depth (1 = top-level event node)
+     * @param isTopLevel whether this is the top-level event node (for 
protected field checks)
+     * @return true if any truncation occurred
+     */
+    private boolean truncateObject(ObjectNode node, int depth, boolean 
isTopLevel) {
+        boolean truncated = false;
+
+        // At max depth, collapse the entire object to retain only scalars
+        if (maxDepth > 0 && depth >= maxDepth) {
+            return collapseAtMaxDepth(node, isTopLevel);
+        }
+
+        List<String> fieldNames = new ArrayList<>();
+        node.fieldNames().forEachRemaining(fieldNames::add);
+
+        for (String fieldName : fieldNames) {
+            if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+                continue;
+            }
+
+            JsonNode child = node.get(fieldName);
+            if (child == null) {
+                continue;
+            }
+
+            if (child.isTextual()) {
+                JsonNode replacement = truncateString(child.textValue());
+                if (replacement != null) {
+                    node.set(fieldName, replacement);
+                    truncated = true;
+                }
+            } else if (child.isArray()) {
+                // Truncate the array first so we don't recurse into elements 
that will be dropped.
+                JsonNode replacement = truncateArray((ArrayNode) child);
+                if (replacement != null) {
+                    ArrayNode retained =
+                            (ArrayNode) ((ObjectNode) 
replacement).get("truncatedList");
+                    truncateArrayContents(retained, depth + 1);
+                    node.set(fieldName, replacement);
+                    truncated = true;
+                } else {
+                    truncated |= truncateArrayContents((ArrayNode) child, 
depth + 1);
+                }
+            } else if (child.isObject()) {
+                truncated |= truncateObject((ObjectNode) child, depth + 1, 
false);
+            }
+        }
+
+        return truncated;
+    }
+
+    /**
+     * Truncates a string if it exceeds maxStringLength.
+     *
+     * @return a wrapper ObjectNode if truncated, or null if no truncation 
needed
+     */
+    private JsonNode truncateString(String value) {
+        if (maxStringLength <= 0 || value == null || value.length() <= 
maxStringLength) {
+            return null;
+        }
+        ObjectNode wrapper = JsonNodeFactory.instance.objectNode();
+        wrapper.put("truncatedString", value.substring(0, maxStringLength) + 
"...");
+        wrapper.put("omittedChars", value.length() - maxStringLength);
+        return wrapper;
+    }
+
+    /**
+     * Truncates an array if it exceeds maxArrayElements.
+     *
+     * @return a wrapper ObjectNode if truncated, or null if no truncation 
needed
+     */
+    private JsonNode truncateArray(ArrayNode array) {
+        if (maxArrayElements <= 0 || array.size() <= maxArrayElements) {
+            return null;
+        }
+        ObjectNode wrapper = JsonNodeFactory.instance.objectNode();
+        ArrayNode retained = JsonNodeFactory.instance.arrayNode();
+        for (int i = 0; i < maxArrayElements; i++) {
+            retained.add(array.get(i));
+        }
+        wrapper.set("truncatedList", retained);
+        wrapper.put("omittedElements", array.size() - maxArrayElements);
+        return wrapper;
+    }
+
+    /**
+     * Recursively truncates contents within an array (strings and nested 
structures).
+     *
+     * @return true if any truncation occurred within array elements
+     */
+    private boolean truncateArrayContents(ArrayNode array, int depth) {
+        boolean truncated = false;
+        for (int i = 0; i < array.size(); i++) {
+            JsonNode element = array.get(i);
+            if (element.isTextual()) {
+                JsonNode replacement = truncateString(element.textValue());
+                if (replacement != null) {
+                    array.set(i, replacement);
+                    truncated = true;
+                }
+            } else if (element.isObject()) {
+                truncated |= truncateObject((ObjectNode) element, depth, 
false);
+            } else if (element.isArray()) {
+                JsonNode replacement = truncateArray((ArrayNode) element);
+                if (replacement != null) {
+                    ArrayNode retained =
+                            (ArrayNode) ((ObjectNode) 
replacement).get("truncatedList");
+                    truncateArrayContents(retained, depth + 1);
+                    array.set(i, replacement);
+                    truncated = true;
+                } else {
+                    truncated |= truncateArrayContents((ArrayNode) element, 
depth + 1);
+                }
+            }
+        }
+        return truncated;
+    }
+
+    /**
+     * At max depth, retain only scalar fields and drop nested objects/arrays.
+     *
+     * @return true if any fields were dropped
+     */
+    private boolean collapseAtMaxDepth(ObjectNode node, boolean isTopLevel) {
+        List<String> fieldNames = new ArrayList<>();
+        node.fieldNames().forEachRemaining(fieldNames::add);
+
+        ObjectNode scalarFields = JsonNodeFactory.instance.objectNode();
+        int omittedCount = 0;
+        boolean hasNonScalar = false;
+
+        for (String fieldName : fieldNames) {
+            if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+                // Protected fields are kept as-is, even if non-scalar
+                scalarFields.set(fieldName, node.get(fieldName));
+                continue;
+            }
+
+            JsonNode child = node.get(fieldName);
+            if (child.isObject() || child.isArray()) {
+                omittedCount++;
+                hasNonScalar = true;
+            } else {
+                // Apply string truncation to scalar string fields even at max 
depth
+                if (child.isTextual()) {
+                    JsonNode replacement = truncateString(child.textValue());
+                    if (replacement != null) {
+                        scalarFields.set(fieldName, replacement);
+                    } else {
+                        scalarFields.set(fieldName, child);
+                    }
+                } else {
+                    scalarFields.set(fieldName, child);
+                }
+            }
+        }
+
+        if (!hasNonScalar) {
+            // No non-scalar fields to drop; but string truncation may have 
occurred
+            // Check if any scalar field was replaced
+            boolean stringTruncated = false;
+            for (String fieldName : fieldNames) {
+                if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+                    continue;
+                }
+                JsonNode original = node.get(fieldName);
+                if (original.isTextual()) {
+                    JsonNode replacement = 
truncateString(original.textValue());
+                    if (replacement != null) {
+                        node.set(fieldName, replacement);
+                        stringTruncated = true;
+                    }
+                }
+            }
+            return stringTruncated;
+        }
+
+        // Replace the node contents with the wrapped version
+        node.removeAll();
+        node.set("truncatedObject", scalarFields);
+        node.put("omittedFields", omittedCount);
+        return true;
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
index 0e965732..59e0daaa 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
@@ -37,6 +37,8 @@ public class BuiltInMetrics {
 
     private final Meter numOfActionsExecutedPerSec;
 
+    private final Counter eventLogTruncatedEvents;
+
     private final HashMap<String, BuiltInActionMetrics> actionMetricGroups;
 
     public BuiltInMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup, 
AgentPlan agentPlan) {
@@ -48,6 +50,8 @@ public class BuiltInMetrics {
         this.numOfActionsExecutedPerSec =
                 parentMetricGroup.getMeter("numOfActionsExecutedPerSec", 
numOfActionsExecuted);
 
+        this.eventLogTruncatedEvents = 
parentMetricGroup.getCounter("eventLogTruncatedEvents");
+
         this.actionMetricGroups = new HashMap<>();
         for (String actionName : agentPlan.getActions().keySet()) {
             actionMetricGroups.put(
@@ -69,4 +73,9 @@ public class BuiltInMetrics {
         numOfActionsExecutedPerSec.markEvent();
         actionMetricGroups.get(actionName).markActionExecuted();
     }
+
+    /** Returns the counter tracking event log truncation occurrences. */
+    public Counter getEventLogTruncatedEventsCounter() {
+        return eventLogTruncatedEvents;
+    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
index cce19585..a79f227d 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
@@ -111,6 +111,10 @@ class EventRouter<IN, OUT> implements AutoCloseable {
             return;
         }
         eventLogger.open(new EventLoggerOpenParams(runtimeContext));
+        if (eventLogger instanceof FileEventLogger) {
+            ((FileEventLogger) eventLogger)
+                    
.setTruncatedEventsCounter(builtInMetrics.getEventLogTruncatedEventsCounter());
+        }
     }
 
     /**
@@ -241,6 +245,8 @@ class EventRouter<IN, OUT> implements AutoCloseable {
         }
         loggerConfigBuilder.property(
                 FileEventLogger.PRETTY_PRINT_PROPERTY_KEY, 
agentPlan.getConfig().get(PRETTY_PRINT));
+        loggerConfigBuilder.property(
+                FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentPlan.getConfig().getConfData());
         return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
     }
 
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java
new file mode 100644
index 00000000..a1d2d26e
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class EventLogLevelResolverTest {
+
+    @Test
+    void testExactMatch() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(
+                
"event-log.type.org.apache.flink.agents.api.event.ChatRequestEvent.level", 
"OFF");
+        EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+        
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+                .isEqualTo(EventLogLevel.OFF);
+    }
+
+    @Test
+    void testParentPackageInheritance() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("event-log.type.org.apache.flink.agents.api.event.level", 
"VERBOSE");
+        EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+        // ChatRequestEvent is under the configured parent package
+        
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+                .isEqualTo(EventLogLevel.VERBOSE);
+    }
+
+    @Test
+    void testGrandparentInheritance() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("event-log.type.org.apache.flink.level", "OFF");
+        EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+        // Should walk up multiple levels to find the match
+        
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+                .isEqualTo(EventLogLevel.OFF);
+    }
+
+    @Test
+    void testRootDefault() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("event-log.level", "VERBOSE");
+        EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+        
assertThat(resolver.resolve("some.unknown.EventType")).isEqualTo(EventLogLevel.VERBOSE);
+    }
+
+    @Test
+    void testBuiltInDefault() {
+        EventLogLevelResolver resolver = new 
EventLogLevelResolver(Collections.emptyMap());
+
+        
assertThat(resolver.resolve("some.unknown.EventType")).isEqualTo(EventLogLevel.STANDARD);
+    }
+
+    @Test
+    void testCaseInsensitive() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("event-log.type.my.Event.level", "verbose");
+        EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+        
assertThat(resolver.resolve("my.Event")).isEqualTo(EventLogLevel.VERBOSE);
+
+        // Also test root level with mixed case
+        Map<String, Object> config2 = new HashMap<>();
+        config2.put("event-log.level", "Off");
+        EventLogLevelResolver resolver2 = new EventLogLevelResolver(config2);
+
+        
assertThat(resolver2.resolve("any.Event")).isEqualTo(EventLogLevel.OFF);
+    }
+
+    @Test
+    void testInvalidLevel() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("event-log.type.my.Event.level", "INVALID_LEVEL");
+
+        assertThatThrownBy(() -> new EventLogLevelResolver(config))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("INVALID_LEVEL");
+    }
+
+    @Test
+    void testNullConfigData() {
+        EventLogLevelResolver resolver = new EventLogLevelResolver(null);
+
+        
assertThat(resolver.resolve("some.Event")).isEqualTo(EventLogLevel.STANDARD);
+    }
+
+    @Test
+    void testNullOrEmptyEventType() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("event-log.level", "VERBOSE");
+        EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+        assertThat(resolver.resolve(null)).isEqualTo(EventLogLevel.VERBOSE);
+        assertThat(resolver.resolve("")).isEqualTo(EventLogLevel.VERBOSE);
+    }
+
+    @Test
+    void testExactMatchTakesPrecedenceOverParent() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("event-log.type.org.apache.flink.agents.api.event.level", 
"OFF");
+        config.put(
+                
"event-log.type.org.apache.flink.agents.api.event.ChatRequestEvent.level",
+                "VERBOSE");
+        EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+        // Exact match should win over parent package
+        
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+                .isEqualTo(EventLogLevel.VERBOSE);
+        // Sibling should inherit from parent
+        
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatResponseEvent"))
+                .isEqualTo(EventLogLevel.OFF);
+    }
+
+    @Test
+    void testCachingReturnsSameResult() {
+        Map<String, Object> config = new HashMap<>();
+        config.put("event-log.type.my.Event.level", "OFF");
+        EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+        // Call twice — should return cached result
+        EventLogLevel first = resolver.resolve("my.Event");
+        EventLogLevel second = resolver.resolve("my.Event");
+        assertThat(first).isSameAs(second);
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
index af2dc794..831fba4f 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.EventContext;
-import org.apache.flink.agents.api.EventFilter;
 import org.apache.flink.agents.api.InputEvent;
 import org.apache.flink.agents.api.OutputEvent;
 import org.apache.flink.agents.api.configuration.AgentConfigOptions;
@@ -301,237 +300,264 @@ class FileEventLoggerTest {
     }
 
     @Test
-    void testEventFilterAcceptAll() throws Exception {
-        // Given - config with ACCEPT_ALL filter (default behavior)
+    void testPrettyPrintOutputsFormattedJson() throws Exception {
+        // Given - config with prettyPrint enabled
         config =
                 EventLoggerConfig.builder()
                         .loggerType("file")
-                        .property("baseLogDir", tempDir.toString())
-                        .eventFilter(EventFilter.ACCEPT_ALL)
+                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
+                        .property(AgentConfigOptions.PRETTY_PRINT.getKey(), 
true)
                         .build();
         logger = new FileEventLogger(config);
 
         logger.open(openParams);
-        InputEvent inputEvent = new InputEvent("input data");
-        OutputEvent outputEvent = new OutputEvent("output data");
-
-        // When
+        InputEvent inputEvent = new InputEvent("test input");
         logger.append(new EventContext(inputEvent), inputEvent);
-        logger.append(new EventContext(outputEvent), outputEvent);
         logger.flush();
 
-        // Then - both events should be logged
+        // Then - output should be valid JSON spanning multiple lines 
(pretty-printed)
         Path logFile = getExpectedLogFilePath();
         List<String> lines = Files.readAllLines(logFile);
-        assertEquals(2, lines.size(), "Both events should be logged with 
ACCEPT_ALL filter");
-
-        // Verify both events were deserialized correctly
-        EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), 
EventLogRecord.class);
-        assertEquals(InputEvent.EVENT_TYPE, inputRecord.getEvent().getType());
-
-        EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), 
EventLogRecord.class);
-        assertEquals(OutputEvent.EVENT_TYPE, 
outputRecord.getEvent().getType());
+        // Pretty-printed JSON for a single event record spans multiple lines
+        assertTrue(lines.size() > 1, "Pretty-printed JSON should span multiple 
lines");
+        // Each line after the first should be indented
+        assertTrue(
+                lines.subList(1, lines.size()).stream().anyMatch(line -> 
line.startsWith("  ")),
+                "Pretty-printed JSON lines should be indented");
+        // The entire content should still be valid JSON
+        String content = String.join("\n", lines);
+        assertDoesNotThrow(
+                () -> objectMapper.readValue(content, EventLogRecord.class),
+                "Pretty-printed output should be valid JSON deserializable to 
EventLogRecord");
     }
 
     @Test
-    void testEventFilterRejectAll() throws Exception {
-        // Given - config with REJECT_ALL filter
+    void testStandardLevelTruncation() throws Exception {
+        // Given - config with STANDARD level and a small max-string-length 
for easy testing
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put("event-log.level", "STANDARD");
+        agentConfig.put("event-log.standard.max-string-length", 10);
+        agentConfig.put("event-log.standard.max-array-elements", 20);
+        agentConfig.put("event-log.standard.max-depth", 5);
+
         config =
                 EventLoggerConfig.builder()
                         .loggerType("file")
-                        .property("baseLogDir", tempDir.toString())
-                        .eventFilter(EventFilter.REJECT_ALL)
+                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
+                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
                         .build();
         logger = new FileEventLogger(config);
-
         logger.open(openParams);
-        InputEvent inputEvent = new InputEvent("input data");
-        OutputEvent outputEvent = new OutputEvent("output data");
 
-        // When
-        logger.append(new EventContext(inputEvent), inputEvent);
-        logger.append(new EventContext(outputEvent), outputEvent);
+        // Use a custom event with a very long string field
+        TestCustomEvent event =
+                new TestCustomEvent("this is a very long string that exceeds 
10", 1);
+        EventContext context = new EventContext(event);
+
+        logger.append(context, event);
         logger.flush();
 
-        // Then - no events should be logged (file should not exist or be 
empty)
         Path logFile = getExpectedLogFilePath();
+        List<String> lines = Files.readAllLines(logFile);
+        assertEquals(1, lines.size());
+
+        JsonNode jsonNode = objectMapper.readTree(lines.get(0));
+        assertEquals("STANDARD", jsonNode.get("logLevel").asText());
+
+        // The customData field (inside attributes) should be truncated
+        JsonNode attrsNode = jsonNode.get("event").get("attributes");
+        JsonNode customDataNode = attrsNode.get("customData");
         assertTrue(
-                !Files.exists(logFile) || 
Files.readAllLines(logFile).isEmpty(),
-                "No events should be logged with REJECT_ALL filter");
+                customDataNode.has("truncatedString"),
+                "Long string should be truncated at STANDARD level");
+        assertTrue(customDataNode.has("omittedChars"));
     }
 
     @Test
-    void testEventFilterByEventType() throws Exception {
-        // Given - config with filter that only accepts InputEvents
+    void testVerboseLevelNoTruncation() throws Exception {
+        // Given - config with VERBOSE level
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put("event-log.level", "VERBOSE");
+        agentConfig.put("event-log.standard.max-string-length", 10);
+
         config =
                 EventLoggerConfig.builder()
                         .loggerType("file")
-                        .property("baseLogDir", tempDir.toString())
-                        .eventFilter(EventFilter.byEventType(InputEvent.class))
+                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
+                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
                         .build();
         logger = new FileEventLogger(config);
-
         logger.open(openParams);
-        InputEvent inputEvent = new InputEvent("input data");
-        OutputEvent outputEvent = new OutputEvent("output data");
-        TestCustomEvent customEvent = new TestCustomEvent("custom data", 42);
 
-        // When
-        logger.append(new EventContext(inputEvent), inputEvent);
-        logger.append(new EventContext(outputEvent), outputEvent);
-        logger.append(new EventContext(customEvent), customEvent);
+        TestCustomEvent event =
+                new TestCustomEvent("this is a very long string that exceeds 
10", 1);
+        EventContext context = new EventContext(event);
+
+        logger.append(context, event);
         logger.flush();
 
-        // Then - only InputEvent should be logged
         Path logFile = getExpectedLogFilePath();
         List<String> lines = Files.readAllLines(logFile);
-        assertEquals(1, lines.size(), "Only InputEvent should be logged");
+        assertEquals(1, lines.size());
 
-        EventLogRecord record = objectMapper.readValue(lines.get(0), 
EventLogRecord.class);
-        assertEquals(InputEvent.EVENT_TYPE, record.getEvent().getType());
-        assertEquals("input data", 
InputEvent.fromEvent(record.getEvent()).getInput());
+        JsonNode jsonNode = objectMapper.readTree(lines.get(0));
+        assertEquals("VERBOSE", jsonNode.get("logLevel").asText());
+
+        // The customData field (inside attributes) should NOT be truncated
+        JsonNode attrsNode = jsonNode.get("event").get("attributes");
+        assertTrue(
+                attrsNode.get("customData").isTextual(),
+                "String should be preserved at VERBOSE level");
+        assertEquals(
+                "this is a very long string that exceeds 10", 
attrsNode.get("customData").asText());
     }
 
     @Test
-    void testEventFilterByMultipleEventTypes() throws Exception {
-        // Given - config with filter that accepts InputEvents and OutputEvents
+    void testOffLevelSkipsEvent() throws Exception {
+        // Given - config with OFF level
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put("event-log.level", "OFF");
+
         config =
                 EventLoggerConfig.builder()
                         .loggerType("file")
-                        .property("baseLogDir", tempDir.toString())
-                        .eventFilter(EventFilter.byEventType(InputEvent.class, 
OutputEvent.class))
+                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
+                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
                         .build();
         logger = new FileEventLogger(config);
-
         logger.open(openParams);
-        InputEvent inputEvent = new InputEvent("input data");
-        OutputEvent outputEvent = new OutputEvent("output data");
-        TestCustomEvent customEvent = new TestCustomEvent("custom data", 42);
 
-        // When
-        logger.append(new EventContext(inputEvent), inputEvent);
-        logger.append(new EventContext(outputEvent), outputEvent);
-        logger.append(new EventContext(customEvent), customEvent);
+        InputEvent event = new InputEvent("should not be logged");
+        EventContext context = new EventContext(event);
+
+        logger.append(context, event);
         logger.flush();
 
-        // Then - InputEvent and OutputEvent should be logged, but not 
TestCustomEvent
         Path logFile = getExpectedLogFilePath();
         List<String> lines = Files.readAllLines(logFile);
-        assertEquals(2, lines.size(), "InputEvent and OutputEvent should be 
logged");
-
-        EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), 
EventLogRecord.class);
-        assertEquals(InputEvent.EVENT_TYPE, inputRecord.getEvent().getType());
-        assertEquals("input data", 
InputEvent.fromEvent(inputRecord.getEvent()).getInput());
-
-        EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), 
EventLogRecord.class);
-        assertEquals(OutputEvent.EVENT_TYPE, 
outputRecord.getEvent().getType());
-        assertEquals("output data", 
OutputEvent.fromEvent(outputRecord.getEvent()).getOutput());
+        assertEquals(0, lines.size(), "OFF level should produce no output");
     }
 
     @Test
-    void testCustomEventFilter() throws Exception {
-        // Given - config with custom filter that only accepts events with 
specific content
-        EventFilter customFilter =
-                (event, context) -> {
-                    if (InputEvent.EVENT_TYPE.equals(event.getType())) {
-                        return InputEvent.fromEvent(event)
-                                .getInput()
-                                .toString()
-                                .contains("important");
-                    }
-                    return false;
-                };
+    void testPerTypeLevelOverride() throws Exception {
+        // Given - root is STANDARD but InputEvent is set to VERBOSE
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put("event-log.level", "STANDARD");
+        agentConfig.put("event-log.standard.max-string-length", 10);
+        agentConfig.put("event-log.type." + InputEvent.EVENT_TYPE + ".level", 
"VERBOSE");
 
         config =
                 EventLoggerConfig.builder()
                         .loggerType("file")
-                        .property("baseLogDir", tempDir.toString())
-                        .eventFilter(customFilter)
+                        .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
+                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
                         .build();
         logger = new FileEventLogger(config);
-
         logger.open(openParams);
-        InputEvent importantEvent = new InputEvent("important data");
-        InputEvent regularEvent = new InputEvent("regular data");
-        OutputEvent outputEvent = new OutputEvent("output data");
 
-        // When
-        logger.append(new EventContext(importantEvent), importantEvent);
-        logger.append(new EventContext(regularEvent), regularEvent);
-        logger.append(new EventContext(outputEvent), outputEvent);
+        // InputEvent should be VERBOSE (no truncation)
+        InputEvent inputEvent = new InputEvent("this is a very long string 
that exceeds 10");
+        logger.append(new EventContext(inputEvent), inputEvent);
+
+        // TestCustomEvent should be STANDARD (truncated)
+        TestCustomEvent customEvent =
+                new TestCustomEvent("this is a very long string that exceeds 
10", 1);
+        logger.append(new EventContext(customEvent), customEvent);
         logger.flush();
 
-        // Then - only the "important" InputEvent should be logged
         Path logFile = getExpectedLogFilePath();
         List<String> lines = Files.readAllLines(logFile);
-        assertEquals(1, lines.size(), "Only important InputEvent should be 
logged");
+        assertEquals(2, lines.size());
 
-        EventLogRecord record = objectMapper.readValue(lines.get(0), 
EventLogRecord.class);
-        assertEquals(InputEvent.EVENT_TYPE, record.getEvent().getType());
-        assertEquals("important data", 
InputEvent.fromEvent(record.getEvent()).getInput());
+        // InputEvent at VERBOSE - no truncation (data lives in attributes)
+        JsonNode inputJson = objectMapper.readTree(lines.get(0));
+        assertEquals("VERBOSE", inputJson.get("logLevel").asText());
+        
assertTrue(inputJson.get("event").get("attributes").get("input").isTextual());
+
+        // TestCustomEvent at STANDARD - truncated (data lives in attributes)
+        JsonNode customJson = objectMapper.readTree(lines.get(1));
+        assertEquals("STANDARD", customJson.get("logLevel").asText());
+        assertTrue(
+                
customJson.get("event").get("attributes").get("customData").has("truncatedString"));
     }
 
     @Test
-    void testDefaultEventFilterBehavior() throws Exception {
-        // Given - config without explicit eventFilter (should default to 
ACCEPT_ALL)
-        config =
-                EventLoggerConfig.builder()
-                        .loggerType("file")
-                        .property("baseLogDir", tempDir.toString())
-                        .build();
-        logger = new FileEventLogger(config);
-
+    void testJsonOutputHasNewFields() throws Exception {
+        // Given - default config
         logger.open(openParams);
-        InputEvent inputEvent = new InputEvent("input data");
-        OutputEvent outputEvent = new OutputEvent("output data");
+        InputEvent event = new InputEvent("test");
+        EventContext context = new EventContext(event);
 
-        // When
-        logger.append(new EventContext(inputEvent), inputEvent);
-        logger.append(new EventContext(outputEvent), outputEvent);
+        logger.append(context, event);
         logger.flush();
 
-        // Then - both events should be logged (default ACCEPT_ALL behavior)
         Path logFile = getExpectedLogFilePath();
         List<String> lines = Files.readAllLines(logFile);
-        assertEquals(2, lines.size(), "Both events should be logged with 
default filter");
+        JsonNode jsonNode = objectMapper.readTree(lines.get(0));
 
-        EventLogRecord inputRecord = objectMapper.readValue(lines.get(0), 
EventLogRecord.class);
-        assertEquals(InputEvent.EVENT_TYPE, inputRecord.getEvent().getType());
+        // Verify new top-level fields exist
+        assertTrue(jsonNode.has("logLevel"), "JSON should have logLevel 
field");
+        assertTrue(jsonNode.has("eventType"), "JSON should have eventType 
field");
+        assertEquals(InputEvent.EVENT_TYPE, 
jsonNode.get("eventType").asText());
+        assertNotNull(jsonNode.get("logLevel").asText());
+    }
 
-        EventLogRecord outputRecord = objectMapper.readValue(lines.get(1), 
EventLogRecord.class);
-        assertEquals(OutputEvent.EVENT_TYPE, 
outputRecord.getEvent().getType());
+    @Test
+    void testBackwardCompatibleDeserialization() throws Exception {
+        // Simulate old-format JSON without a top-level logLevel field. The 
Event payload uses
+        // the post-#631 shape: {type, id, attributes}. The deserializer must 
still parse it.
+        String oldFormatJson =
+                "{\"timestamp\":\"2024-01-15T10:30:00Z\","
+                        + "\"event\":{\"eventType\":\""
+                        + InputEvent.EVENT_TYPE
+                        + "\",\"type\":\""
+                        + InputEvent.EVENT_TYPE
+                        + "\","
+                        + "\"attributes\":{\"input\":\"test\"}}}";
+
+        EventLogRecord record = objectMapper.readValue(oldFormatJson, 
EventLogRecord.class);
+        assertNotNull(record.getEvent());
+        assertEquals(InputEvent.EVENT_TYPE, record.getEvent().getType());
+        assertEquals(
+                "test",
+                InputEvent.fromEvent(record.getEvent()).getInput(),
+                "Old-format JSON without logLevel should still deserialize the 
event payload");
     }
 
     @Test
-    void testPrettyPrintOutputsFormattedJson() throws Exception {
-        // Given - config with prettyPrint enabled
+    void testHierarchicalInheritance() throws Exception {
+        // Set namespace-level OFF, but specific type VERBOSE. Uses custom 
dotted event types
+        // because built-in event types (e.g., "_input_event") have no 
dot-separated parents.
+        Map<String, Object> agentConfig = new HashMap<>();
+        agentConfig.put("event-log.level", "STANDARD");
+        agentConfig.put("event-log.type.com.example.events.level", "OFF");
+        agentConfig.put("event-log.type." + TestNamespacedEventA.EVENT_TYPE + 
".level", "VERBOSE");
+
         config =
                 EventLoggerConfig.builder()
                         .loggerType("file")
                         .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
tempDir.toString())
-                        .property(AgentConfigOptions.PRETTY_PRINT.getKey(), 
true)
+                        .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY, 
agentConfig)
                         .build();
         logger = new FileEventLogger(config);
-
         logger.open(openParams);
-        InputEvent inputEvent = new InputEvent("test input");
-        logger.append(new EventContext(inputEvent), inputEvent);
+
+        // EventA has explicit VERBOSE override — should be logged
+        TestNamespacedEventA eventA = new TestNamespacedEventA("should be 
logged");
+        logger.append(new EventContext(eventA), eventA);
+
+        // EventB inherits OFF from namespace level — should NOT be logged
+        TestNamespacedEventB eventB = new TestNamespacedEventB("should not be 
logged");
+        logger.append(new EventContext(eventB), eventB);
         logger.flush();
 
-        // Then - output should be valid JSON spanning multiple lines 
(pretty-printed)
         Path logFile = getExpectedLogFilePath();
         List<String> lines = Files.readAllLines(logFile);
-        // Pretty-printed JSON for a single event record spans multiple lines
-        assertTrue(lines.size() > 1, "Pretty-printed JSON should span multiple 
lines");
-        // Each line after the first should be indented
-        assertTrue(
-                lines.subList(1, lines.size()).stream().anyMatch(line -> 
line.startsWith("  ")),
-                "Pretty-printed JSON lines should be indented");
-        // The entire content should still be valid JSON
-        String content = String.join("\n", lines);
-        assertDoesNotThrow(
-                () -> objectMapper.readValue(content, EventLogRecord.class),
-                "Pretty-printed output should be valid JSON deserializable to 
EventLogRecord");
+        assertEquals(1, lines.size(), "Only EventA (VERBOSE override) should 
be logged");
+
+        JsonNode json = objectMapper.readTree(lines.get(0));
+        assertEquals("VERBOSE", json.get("logLevel").asText());
+        assertEquals(TestNamespacedEventA.EVENT_TYPE, 
json.get("eventType").asText());
     }
 
     private Path getExpectedLogFilePath() {
@@ -573,4 +599,24 @@ class FileEventLoggerTest {
             return ((Number) getAttr("customNumber")).intValue();
         }
     }
+
+    /** Custom event with a dot-separated type to exercise hierarchical level 
inheritance. */
+    public static class TestNamespacedEventA extends Event {
+        public static final String EVENT_TYPE = "com.example.events.A";
+
+        public TestNamespacedEventA(String payload) {
+            super(EVENT_TYPE);
+            setAttr("payload", payload);
+        }
+    }
+
+    /** Custom event sharing EventA's namespace, used to verify 
namespace-level inheritance. */
+    public static class TestNamespacedEventB extends Event {
+        public static final String EVENT_TYPE = "com.example.events.B";
+
+        public TestNamespacedEventB(String payload) {
+            super(EVENT_TYPE);
+            setAttr("payload", payload);
+        }
+    }
 }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java
new file mode 100644
index 00000000..bd05b9a1
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class JsonTruncatorTest {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @Test
+    void testStringTruncation() {
+        JsonTruncator truncator = new JsonTruncator(10, 0, 0);
+        ObjectNode node = MAPPER.createObjectNode();
+        node.put("content", "This is a very long string that exceeds the 
limit");
+
+        boolean result = truncator.truncate(node);
+
+        assertThat(result).isTrue();
+        assertThat(node.get("content").isObject()).isTrue();
+        
assertThat(node.get("content").get("truncatedString").asText()).isEqualTo("This 
is a ...");
+        assertThat(node.get("content").get("omittedChars").asInt())
+                .isEqualTo(49 - 10); // total length minus maxStringLength
+    }
+
+    @Test
+    void testArrayTrimming() {
+        JsonTruncator truncator = new JsonTruncator(0, 3, 0);
+        ObjectNode node = MAPPER.createObjectNode();
+        ArrayNode array = MAPPER.createArrayNode();
+        for (int i = 0; i < 7; i++) {
+            array.add("item" + i);
+        }
+        node.set("items", array);
+
+        boolean result = truncator.truncate(node);
+
+        assertThat(result).isTrue();
+        assertThat(node.get("items").isObject()).isTrue();
+        assertThat(node.get("items").get("truncatedList").size()).isEqualTo(3);
+        
assertThat(node.get("items").get("truncatedList").get(0).asText()).isEqualTo("item0");
+        
assertThat(node.get("items").get("truncatedList").get(1).asText()).isEqualTo("item1");
+        
assertThat(node.get("items").get("truncatedList").get(2).asText()).isEqualTo("item2");
+        
assertThat(node.get("items").get("omittedElements").asInt()).isEqualTo(4);
+    }
+
+    @Test
+    void testDepthCollapsing() {
+        JsonTruncator truncator = new JsonTruncator(0, 0, 2);
+        ObjectNode node = MAPPER.createObjectNode();
+        ObjectNode nested = MAPPER.createObjectNode();
+        nested.put("scalarField", "value");
+        nested.put("numberField", 42);
+        ObjectNode deepNested = MAPPER.createObjectNode();
+        deepNested.put("deep", "data");
+        nested.set("nestedObj", deepNested);
+        ArrayNode nestedArray = MAPPER.createArrayNode();
+        nestedArray.add("a");
+        nested.set("nestedArr", nestedArray);
+        node.set("data", nested);
+
+        boolean result = truncator.truncate(node);
+
+        assertThat(result).isTrue();
+        // The "data" field's nested object should be collapsed at depth 2
+        assertThat(node.get("data").has("truncatedObject")).isTrue();
+        ObjectNode truncatedObj = (ObjectNode) 
node.get("data").get("truncatedObject");
+        
assertThat(truncatedObj.get("scalarField").asText()).isEqualTo("value");
+        assertThat(truncatedObj.get("numberField").asInt()).isEqualTo(42);
+        assertThat(truncatedObj.has("nestedObj")).isFalse();
+        assertThat(truncatedObj.has("nestedArr")).isFalse();
+        assertThat(node.get("data").get("omittedFields").asInt()).isEqualTo(2);
+    }
+
+    @Test
+    void testNoTruncationUnderLimits() {
+        JsonTruncator truncator = new JsonTruncator(100, 10, 5);
+        ObjectNode node = MAPPER.createObjectNode();
+        node.put("short", "hello");
+        ArrayNode smallArray = MAPPER.createArrayNode();
+        smallArray.add("a");
+        smallArray.add("b");
+        node.set("list", smallArray);
+
+        boolean result = truncator.truncate(node);
+
+        assertThat(result).isFalse();
+        assertThat(node.get("short").asText()).isEqualTo("hello");
+        assertThat(node.get("list").size()).isEqualTo(2);
+    }
+
+    @Test
+    void testDisabledThresholds() {
+        // All thresholds set to 0 — no truncation should occur
+        JsonTruncator truncator = new JsonTruncator(0, 0, 0);
+        ObjectNode node = MAPPER.createObjectNode();
+        node.put("content", "A".repeat(10000));
+        ArrayNode bigArray = MAPPER.createArrayNode();
+        for (int i = 0; i < 100; i++) {
+            bigArray.add(i);
+        }
+        node.set("items", bigArray);
+        ObjectNode deep = MAPPER.createObjectNode();
+        deep.set("level2", MAPPER.createObjectNode().put("level3", "deep"));
+        node.set("nested", deep);
+
+        boolean result = truncator.truncate(node);
+
+        assertThat(result).isFalse();
+        assertThat(node.get("content").asText()).hasSize(10000);
+        assertThat(node.get("items").size()).isEqualTo(100);
+    }
+
+    @Test
+    void testProtectedFields() {
+        JsonTruncator truncator = new JsonTruncator(5, 2, 2);
+        ObjectNode node = MAPPER.createObjectNode();
+        // Protected fields should never be truncated
+        node.put("eventType", 
"org.apache.flink.agents.api.event.ChatRequestEvent");
+        node.put("id", "a-very-long-identifier-that-exceeds-the-limit");
+        ObjectNode attributes = MAPPER.createObjectNode();
+        attributes.put("key", "value");
+        attributes.set("nested", MAPPER.createObjectNode().put("deep", 
"data"));
+        node.set("attributes", attributes);
+        // Non-protected field should be truncated
+        node.put("content", "This should be truncated");
+
+        boolean result = truncator.truncate(node);
+
+        assertThat(result).isTrue();
+        // Protected fields remain untouched
+        assertThat(node.get("eventType").asText())
+                
.isEqualTo("org.apache.flink.agents.api.event.ChatRequestEvent");
+        assertThat(node.get("id").asText())
+                .isEqualTo("a-very-long-identifier-that-exceeds-the-limit");
+        assertThat(node.get("attributes").isObject()).isTrue();
+        
assertThat(node.get("attributes").get("key").asText()).isEqualTo("value");
+        // Non-protected field is truncated
+        assertThat(node.get("content").get("truncatedString")).isNotNull();
+    }
+
+    @Test
+    void testCompositeScenario() {
+        JsonTruncator truncator = new JsonTruncator(10, 2, 3);
+        ObjectNode node = MAPPER.createObjectNode();
+        // Long string — should trigger string truncation
+        node.put("message", "Hello, this is a long message from the LLM");
+        // Large array — should trigger array truncation
+        ArrayNode tools = MAPPER.createArrayNode();
+        for (int i = 0; i < 5; i++) {
+            tools.add("tool" + i);
+        }
+        node.set("tools", tools);
+        // Deep nesting — should trigger depth truncation at level 3
+        ObjectNode level1 = MAPPER.createObjectNode();
+        ObjectNode level2 = MAPPER.createObjectNode();
+        level2.put("scalar", "kept");
+        level2.set("tooDeep", MAPPER.createObjectNode().put("hidden", "gone"));
+        level1.set("inner", level2);
+        node.set("metadata", level1);
+
+        boolean result = truncator.truncate(node);
+
+        assertThat(result).isTrue();
+        // String truncation applied
+        assertThat(node.get("message").get("truncatedString")).isNotNull();
+        
assertThat(node.get("message").get("omittedChars").asInt()).isGreaterThan(0);
+        // Array truncation applied
+        assertThat(node.get("tools").get("truncatedList").size()).isEqualTo(2);
+        
assertThat(node.get("tools").get("omittedElements").asInt()).isEqualTo(3);
+        // Depth truncation: level2 is at depth 3, so it should be collapsed
+        ObjectNode metadataInner = (ObjectNode) 
node.get("metadata").get("inner");
+        assertThat(metadataInner.has("truncatedObject")).isTrue();
+        
assertThat(metadataInner.get("truncatedObject").get("scalar").asText()).isEqualTo("kept");
+        assertThat(metadataInner.get("omittedFields").asInt()).isEqualTo(1);
+    }
+
+    @Test
+    void testRecursionSkipsDroppedTailElements() {
+        // Regression test: when an array exceeds maxArrayElements, the 
elements beyond the cap
+        // must NOT be recursed into. Prior to the fix, truncateArrayContents 
ran on the full
+        // array before truncateArray dropped the tail, causing wasted 
CPU/allocations on
+        // elements that were about to be discarded.
+        JsonTruncator truncator = new JsonTruncator(5, 2, 0);
+        ObjectNode node = MAPPER.createObjectNode();
+        ArrayNode items = MAPPER.createArrayNode();
+        // Two short strings that fit under the retained cap and the string 
limit.
+        items.add("a");
+        items.add("b");
+        // Three tail ObjectNodes whose long-string fields would be truncated 
if visited.
+        String longValue = "this-string-is-much-longer-than-five-chars";
+        ObjectNode tail0 = MAPPER.createObjectNode();
+        tail0.put("payload", longValue);
+        ObjectNode tail1 = MAPPER.createObjectNode();
+        tail1.put("payload", longValue);
+        ObjectNode tail2 = MAPPER.createObjectNode();
+        tail2.put("payload", longValue);
+        items.add(tail0);
+        items.add(tail1);
+        items.add(tail2);
+        node.set("items", items);
+
+        boolean result = truncator.truncate(node);
+
+        assertThat(result).isTrue();
+        // Retained list is exactly the first 2 elements; 3 are dropped.
+        assertThat(node.get("items").get("truncatedList").size()).isEqualTo(2);
+        
assertThat(node.get("items").get("truncatedList").get(0).asText()).isEqualTo("a");
+        
assertThat(node.get("items").get("truncatedList").get(1).asText()).isEqualTo("b");
+        
assertThat(node.get("items").get("omittedElements").asInt()).isEqualTo(3);
+        // The dropped tail ObjectNodes were never visited — their payload 
fields remain
+        // untouched (still raw strings, not wrapped truncatedString objects). 
Under the old
+        // pre-reorder code path, these would have been mutated before being 
discarded.
+        assertThat(tail0.get("payload").isTextual()).isTrue();
+        assertThat(tail0.get("payload").asText()).isEqualTo(longValue);
+        assertThat(tail1.get("payload").isTextual()).isTrue();
+        assertThat(tail1.get("payload").asText()).isEqualTo(longValue);
+        assertThat(tail2.get("payload").isTextual()).isTrue();
+        assertThat(tail2.get("payload").asText()).isEqualTo(longValue);
+    }
+
+    @Test
+    void testNullNode() {
+        JsonTruncator truncator = new JsonTruncator(10, 10, 10);
+
+        boolean result = truncator.truncate(null);
+
+        assertThat(result).isFalse();
+    }
+}


Reply via email to