This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new c72f8f6f [runtime] Support per-event-type configurable log levels for
event log (#609)
c72f8f6f is described below
commit c72f8f6f03d1e7891bb6af2d692a4f8f163bb595
Author: Weiqing Yang <[email protected]>
AuthorDate: Mon May 11 19:13:50 2026 -0700
[runtime] Support per-event-type configurable log levels for event log
(#609)
---
.../org/apache/flink/agents/api/EventFilter.java | 61 ----
.../api/configuration/AgentConfigOptions.java | 30 ++
.../flink/agents/api/logger/EventLogLevel.java | 61 ++++
.../flink/agents/api/logger/EventLoggerConfig.java | 36 +--
python/flink_agents/api/core_options.py | 25 ++
.../python_event_logging_test.py | 124 ++++++++
.../runtime/eventlog/EventLogLevelResolver.java | 140 +++++++++
.../agents/runtime/eventlog/EventLogRecord.java | 2 +
.../eventlog/EventLogRecordJsonDeserializer.java | 3 +-
.../eventlog/EventLogRecordJsonSerializer.java | 3 +
.../agents/runtime/eventlog/FileEventLogger.java | 110 ++++++-
.../agents/runtime/eventlog/JsonTruncator.java | 285 ++++++++++++++++++
.../agents/runtime/metrics/BuiltInMetrics.java | 9 +
.../flink/agents/runtime/operator/EventRouter.java | 6 +
.../eventlog/EventLogLevelResolverTest.java | 153 ++++++++++
.../runtime/eventlog/FileEventLoggerTest.java | 318 ++++++++++++---------
.../agents/runtime/eventlog/JsonTruncatorTest.java | 251 ++++++++++++++++
17 files changed, 1376 insertions(+), 241 deletions(-)
diff --git a/api/src/main/java/org/apache/flink/agents/api/EventFilter.java
b/api/src/main/java/org/apache/flink/agents/api/EventFilter.java
deleted file mode 100644
index ce97cc59..00000000
--- a/api/src/main/java/org/apache/flink/agents/api/EventFilter.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.agents.api;
-
-/**
- * Interface for filtering events in event logging and listening.
- *
- * <p>EventFilter allows fine-grained control over which events are processed
by event logs and
- * event listeners. Implementations can filter based on event type,
attributes, or custom logic.
- */
-@FunctionalInterface
-public interface EventFilter {
- /**
- * Determines whether an event should be processed.
- *
- * @param event The event to evaluate
- * @param context The context associated with the event
- * @return true if the event should be processed, false otherwise
- */
- boolean accept(Event event, EventContext context);
-
- /**
- * Creates a filter that accepts events of the specified types.
- *
- * @param eventTypes The event types to accept
- * @return An EventFilter that accepts only the specified event types
- */
- @SafeVarargs
- static EventFilter byEventType(Class<? extends Event>... eventTypes) {
- return (event, context) -> {
- for (Class<? extends Event> eventType : eventTypes) {
- if (eventType.isInstance(event)) {
- return true;
- }
- }
- return false;
- };
- }
-
- /** A filter that accepts all events. */
- EventFilter ACCEPT_ALL = (event, context) -> true;
-
- /** A filter that rejects all events. */
- EventFilter REJECT_ALL = (event, context) -> false;
-}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
index 72424745..9cdda8bd 100644
---
a/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
+++
b/api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java
@@ -17,6 +17,8 @@
*/
package org.apache.flink.agents.api.configuration;
+import org.apache.flink.agents.api.logger.EventLogLevel;
+
/** The set of configuration options for agents parameters. */
public class AgentConfigOptions {
@@ -54,4 +56,32 @@ public class AgentConfigOptions {
/** The config parameter specifies the unique identifier of job. */
public static final ConfigOption<String> JOB_IDENTIFIER =
new ConfigOption<>("job-identifier", String.class, null);
+
+ /**
+ * The global event log level controlling the default verbosity for all
event types. Valid
+ * values are "OFF", "STANDARD", and "VERBOSE". Defaults to "STANDARD".
+ */
+ public static final ConfigOption<EventLogLevel> EVENT_LOG_LEVEL =
+ new ConfigOption<>("event-log.level", EventLogLevel.class,
EventLogLevel.STANDARD);
+
+ /**
+ * The maximum string length for event payloads when logging at STANDARD
level. Strings
+ * exceeding this length will be truncated. Defaults to 2000.
+ */
+ public static final ConfigOption<Integer> EVENT_LOG_MAX_STRING_LENGTH =
+ new ConfigOption<>("event-log.standard.max-string-length",
Integer.class, 2000);
+
+ /**
+ * The maximum number of array elements to include in event payloads when
logging at STANDARD
+ * level. Arrays exceeding this size will be truncated. Defaults to 20.
+ */
+ public static final ConfigOption<Integer> EVENT_LOG_MAX_ARRAY_ELEMENTS =
+ new ConfigOption<>("event-log.standard.max-array-elements",
Integer.class, 20);
+
+ /**
+ * The maximum nesting depth for event payloads when logging at STANDARD
level. Objects deeper
+ * than this level will be summarized. Defaults to 5.
+ */
+ public static final ConfigOption<Integer> EVENT_LOG_MAX_DEPTH =
+ new ConfigOption<>("event-log.standard.max-depth", Integer.class,
5);
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java
new file mode 100644
index 00000000..118e265c
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/logger/EventLogLevel.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.api.logger;
+
+/**
+ * Log level for event logging, controlling the verbosity of event log output.
+ *
+ * <ul>
+ * <li>{@link #OFF} - No events of this type are logged.
+ * <li>{@link #STANDARD} - Events are logged with truncated/summarized
payloads (default).
+ * <li>{@link #VERBOSE} - Events are logged with full, untruncated payloads.
+ * </ul>
+ */
+public enum EventLogLevel {
+
+ /** No events of this type are logged. */
+ OFF,
+
+ /** Events are logged with truncated/summarized payloads. This is the
default level. */
+ STANDARD,
+
+ /** Events are logged with full, untruncated payloads. */
+ VERBOSE;
+
+ /**
+ * Parses a string value into an {@link EventLogLevel}, case-insensitively.
+ *
+ * @param value the string representation of the log level (e.g., "off",
"STANDARD", "Verbose")
+ * @return the corresponding {@link EventLogLevel}
+ * @throws IllegalArgumentException if the value does not match any log
level
+ */
+ public static EventLogLevel fromString(String value) {
+ if (value == null) {
+ throw new IllegalArgumentException("EventLogLevel value cannot be
null");
+ }
+ try {
+ return valueOf(value.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ "Invalid EventLogLevel: '"
+ + value
+ + "'. Valid values are: OFF, STANDARD, VERBOSE");
+ }
+ }
+}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
index f57adef6..1dedeef0 100644
---
a/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
+++
b/api/src/main/java/org/apache/flink/agents/api/logger/EventLoggerConfig.java
@@ -18,8 +18,6 @@
package org.apache.flink.agents.api.logger;
-import org.apache.flink.agents.api.EventFilter;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -45,14 +43,11 @@ import java.util.Objects;
public final class EventLoggerConfig {
private final String loggerType;
- private final EventFilter eventFilter;
private final Map<String, Object> properties;
/** Private constructor - use {@link #builder()} to create instances. */
- private EventLoggerConfig(
- String loggerType, EventFilter eventFilter, Map<String, Object>
properties) {
+ private EventLoggerConfig(String loggerType, Map<String, Object>
properties) {
this.loggerType = Objects.requireNonNull(loggerType, "Logger type
cannot be null");
- this.eventFilter = eventFilter == null ? EventFilter.ACCEPT_ALL :
eventFilter;
this.properties = Collections.unmodifiableMap(new
HashMap<>(properties));
}
@@ -81,15 +76,6 @@ public final class EventLoggerConfig {
return loggerType;
}
- /**
- * Gets the event filter for this logger configuration.
- *
- * @return the EventFilter to apply, never null
- */
- public EventFilter getEventFilter() {
- return eventFilter;
- }
-
/**
* Gets the implementation-specific properties for this logger
configuration.
*
@@ -113,13 +99,12 @@ public final class EventLoggerConfig {
if (o == null || getClass() != o.getClass()) return false;
EventLoggerConfig that = (EventLoggerConfig) o;
return Objects.equals(loggerType, that.loggerType)
- && Objects.equals(eventFilter, that.eventFilter)
&& Objects.equals(properties, that.properties);
}
@Override
public int hashCode() {
- return Objects.hash(loggerType, eventFilter, properties);
+ return Objects.hash(loggerType, properties);
}
@Override
@@ -128,8 +113,6 @@ public final class EventLoggerConfig {
+ "loggerType='"
+ loggerType
+ '\''
- + ", eventFilter="
- + eventFilter
+ ", properties="
+ properties
+ '}';
@@ -143,7 +126,6 @@ public final class EventLoggerConfig {
*/
public static final class Builder {
private String loggerType = "file"; // Default to file logger
- private EventFilter eventFilter = EventFilter.ACCEPT_ALL; // Default
to accept all
private final Map<String, Object> properties = new HashMap<>();
private Builder() {}
@@ -163,18 +145,6 @@ public final class EventLoggerConfig {
return this;
}
- /**
- * Sets the event filter for this configuration.
- *
- * @param eventFilter the EventFilter to apply
- * @return this Builder instance for method chaining
- * @throws IllegalArgumentException if eventFilter is null
- */
- public Builder eventFilter(EventFilter eventFilter) {
- this.eventFilter = Objects.requireNonNull(eventFilter, "Event
filter cannot be null");
- return this;
- }
-
/**
* Adds a property to the configuration.
*
@@ -213,7 +183,7 @@ public final class EventLoggerConfig {
* @return a new EventLoggerConfig instance
*/
public EventLoggerConfig build() {
- return new EventLoggerConfig(loggerType, eventFilter, properties);
+ return new EventLoggerConfig(loggerType, properties);
}
}
}
diff --git a/python/flink_agents/api/core_options.py
b/python/flink_agents/api/core_options.py
index 9245e78c..98b8b317 100644
--- a/python/flink_agents/api/core_options.py
+++ b/python/flink_agents/api/core_options.py
@@ -91,6 +91,31 @@ class AgentConfigOptions(metaclass=AgentConfigOptionsMeta):
default=None,
)
+ # Event log level config options
+ EVENT_LOG_LEVEL = ConfigOption(
+ key="event-log.level",
+ config_type=str,
+ default="STANDARD",
+ )
+
+ EVENT_LOG_MAX_STRING_LENGTH = ConfigOption(
+ key="event-log.standard.max-string-length",
+ config_type=int,
+ default=2000,
+ )
+
+ EVENT_LOG_MAX_ARRAY_ELEMENTS = ConfigOption(
+ key="event-log.standard.max-array-elements",
+ config_type=int,
+ default=20,
+ )
+
+ EVENT_LOG_MAX_DEPTH = ConfigOption(
+ key="event-log.standard.max-depth",
+ config_type=int,
+ default=5,
+ )
+
class AgentExecutionOptions:
"""Execution options for Flink Agents."""
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
index 7ea4f3d4..89807927 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_integration/python_event_logging_test.py
@@ -126,6 +126,8 @@ def test_python_event_logging(tmp_path: Path) -> None:
assert record is not None, "Event log file is empty."
assert record_line is not None, "Event log file is empty."
assert "timestamp" in record
+ assert "logLevel" in record
+ assert "eventType" in record
assert "event" in record
assert "eventType" in record["event"]
assert has_processed_review, "Log should contain processed review content"
@@ -137,3 +139,125 @@ def test_python_event_logging(tmp_path: Path) -> None:
assert id_idx != -1
assert attributes_idx != -1
assert event_type_idx < id_idx < attributes_idx
+
+
+def _run_event_logging_pipeline(
+ tmp_path: Path, config_overrides: dict[str, str] | None = None
+) -> Path:
+ """Run the event logging pipeline and return the event log directory.
+
+ Args:
+ tmp_path: Temporary directory for log output.
+ config_overrides: Optional dict of config key-value pairs to set.
+
+ Returns:
+ The event log directory path.
+ """
+ event_log_dir = tmp_path / "event_log"
+ default_log_dir = Path(tempfile.gettempdir()) / "flink-agents"
+ shutil.rmtree(default_log_dir, ignore_errors=True)
+
+ config = Configuration()
+ env = StreamExecutionEnvironment.get_execution_environment(config)
+ env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+ env.set_parallelism(1)
+
+ agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+ agents_env.get_config().set_str("baseLogDir", str(event_log_dir))
+
+ if config_overrides:
+ for key, value in config_overrides.items():
+ agents_env.get_config().set_str(key, value)
+
+ current_dir = Path(__file__).parent
+ input_datastream = env.from_source(
+ source=FileSource.for_record_stream_format(
+ StreamFormat.text_line_format(),
+ f"file:///{current_dir}/../resources/input/input_data.txt",
+ ).build(),
+ watermark_strategy=WatermarkStrategy.no_watermarks(),
+ source_name="python_event_logging_test",
+ )
+
+ deserialize_datastream = input_datastream.map(lambda x: json.loads(x))
+
+ agents_env.from_datastream(
+ input=deserialize_datastream, key_selector=InputKeySelector()
+ ).apply(PythonEventLoggingAgent()).to_datastream()
+
+ agents_env.execute()
+ return event_log_dir
+
+
+def _read_log_records(event_log_dir: Path) -> list[dict]:
+ """Read all JSON records from event log files.
+
+ Args:
+ event_log_dir: Directory containing event log files.
+
+ Returns:
+ List of parsed JSON records.
+ """
+ records: list[dict] = []
+ for log_file in event_log_dir.glob("events-*.log"):
+ with log_file.open(encoding="utf-8") as handle:
+ records.extend(json.loads(line) for line in handle if line.strip())
+ return records
+
+
+def test_event_log_verbose_level(tmp_path: Path) -> None:
+ """Test that VERBOSE log level writes events without truncation."""
+ event_log_dir = _run_event_logging_pipeline(
+ tmp_path, config_overrides={"event-log.level": "VERBOSE"}
+ )
+
+ records = _read_log_records(event_log_dir)
+ assert len(records) > 0, "VERBOSE level should produce event log records"
+
+ for record in records:
+ assert record.get("logLevel") == "VERBOSE", (
+ f"Expected logLevel VERBOSE, got {record.get('logLevel')}"
+ )
+
+ # VERBOSE should not truncate content (no truncatedString wrappers)
+ raw_content = json.dumps(records)
+ assert "truncatedString" not in raw_content, (
+ "VERBOSE level should not truncate any content"
+ )
+
+
+def test_event_log_off_level(tmp_path: Path) -> None:
+ """Test that OFF log level suppresses all event logging."""
+ event_log_dir = _run_event_logging_pipeline(
+ tmp_path, config_overrides={"event-log.level": "OFF"}
+ )
+
+ records = _read_log_records(event_log_dir)
+ assert len(records) == 0, (
+ f"OFF level should not produce any event log records, but found
{len(records)}"
+ )
+
+
+def test_event_log_standard_truncation(tmp_path: Path) -> None:
+ """Test that STANDARD level truncates strings exceeding
max-string-length."""
+ event_log_dir = _run_event_logging_pipeline(
+ tmp_path,
+ config_overrides={
+ "event-log.level": "STANDARD",
+ "event-log.standard.max-string-length": "10",
+ },
+ )
+
+ records = _read_log_records(event_log_dir)
+ assert len(records) > 0, "STANDARD level should produce event log records"
+
+ for record in records:
+ assert record.get("logLevel") == "STANDARD", (
+ f"Expected logLevel STANDARD, got {record.get('logLevel')}"
+ )
+
+ # With max-string-length=10, any string longer than 10 chars should be
truncated
+ raw_content = json.dumps(records)
+ assert "truncatedString" in raw_content, (
+ "STANDARD level with max-string-length=10 should truncate long strings"
+ )
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java
new file mode 100644
index 00000000..f2f4b502
--- /dev/null
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolver.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Resolves the effective {@link EventLogLevel} for a given event type string
using hierarchical
+ * config key inheritance.
+ *
+ * <p>Resolution order for a given event type (e.g., {@code
org.apache.flink.agents.api.event
+ * .ChatRequestEvent}):
+ *
+ * <ol>
+ * <li>Exact match in explicit per-type configuration
+ * <li>Walk up dot-separated segments (e.g., {@code
org.apache.flink.agents.api.event}, then
+ * {@code org.apache.flink.agents.api}, etc.)
+ * <li>Root default from {@code event-log.level} config key
+ * <li>Built-in default: {@link EventLogLevel#STANDARD}
+ * </ol>
+ *
+ * <p>This mirrors Log4j's hierarchical logger configuration pattern. Resolved
levels are cached in
+ * a {@link ConcurrentHashMap} for efficient repeated lookups.
+ *
+ * <p>Config keys are expected in the form:
+ *
+ * <pre>
+ * event-log.level = STANDARD (root default)
+ * event-log.type.<EVENT_TYPE>.level = OFF (per-type override)
+ * </pre>
+ */
+public class EventLogLevelResolver {
+
+ /** Prefix for per-event-type log level config keys. */
+ static final String TYPE_PREFIX = "event-log.type.";
+
+ /** Suffix for per-event-type log level config keys. */
+ static final String TYPE_SUFFIX = ".level";
+
+ private final EventLogLevel rootDefault;
+ private final Map<String, EventLogLevel> explicitLevels;
+ private final ConcurrentHashMap<String, EventLogLevel> cache;
+
+ /**
+ * Creates a resolver from a configuration data map.
+ *
+ * <p>The map is scanned for keys matching {@code
event-log.type.<EVENT_TYPE>.level} to build
+ * the explicit per-type level mappings, and {@code event-log.level} for
the root default.
+ *
+ * @param confData the flat configuration key-value map (e.g., from {@code
+ * AgentConfiguration.getConfData()})
+ */
+ public EventLogLevelResolver(Map<String, Object> confData) {
+ Map<String, Object> data = confData != null ? confData :
Collections.emptyMap();
+
+ // Parse root default
+ Object rootValue =
data.get(AgentConfigOptions.EVENT_LOG_LEVEL.getKey());
+ this.rootDefault =
+ rootValue != null
+ ? EventLogLevel.fromString(rootValue.toString())
+ : AgentConfigOptions.EVENT_LOG_LEVEL.getDefaultValue();
+
+ // Scan for per-type overrides
+ Map<String, EventLogLevel> levels = new HashMap<>();
+ for (Map.Entry<String, Object> entry : data.entrySet()) {
+ String key = entry.getKey();
+ if (key.startsWith(TYPE_PREFIX) && key.endsWith(TYPE_SUFFIX)) {
+ String eventType =
+ key.substring(TYPE_PREFIX.length(), key.length() -
TYPE_SUFFIX.length());
+ if (!eventType.isEmpty()) {
+ levels.put(eventType,
EventLogLevel.fromString(entry.getValue().toString()));
+ }
+ }
+ }
+ this.explicitLevels = Collections.unmodifiableMap(levels);
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Resolves the effective log level for the given event type string.
+ *
+ * <p>Uses hierarchical inheritance: exact match first, then walks up
dot-separated segments,
+ * then falls back to root default, then built-in default ({@link
EventLogLevel#STANDARD}).
+ *
+ * @param eventType the fully qualified event type string (e.g., {@code
+ * org.apache.flink.agents.api.event.ChatRequestEvent})
+ * @return the resolved {@link EventLogLevel}, never null
+ */
+ public EventLogLevel resolve(String eventType) {
+ if (eventType == null || eventType.isEmpty()) {
+ return rootDefault;
+ }
+ return cache.computeIfAbsent(eventType, this::doResolve);
+ }
+
+ private EventLogLevel doResolve(String eventType) {
+ // 1. Exact match
+ EventLogLevel level = explicitLevels.get(eventType);
+ if (level != null) {
+ return level;
+ }
+
+ // 2. Walk up dot-separated segments
+ String current = eventType;
+ int lastDot = current.lastIndexOf('.');
+ while (lastDot > 0) {
+ current = current.substring(0, lastDot);
+ level = explicitLevels.get(current);
+ if (level != null) {
+ return level;
+ }
+ lastDot = current.lastIndexOf('.');
+ }
+
+ // 3. Root default (already parsed in constructor, falls through to
built-in if not set)
+ return rootDefault;
+ }
+}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
index 5b83452d..402c3f52 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecord.java
@@ -43,10 +43,12 @@ public class EventLogRecord {
this.event = event;
}
+ /** Returns the event context. */
public EventContext getContext() {
return context;
}
+ /** Returns the event. */
public Event getEvent() {
return event;
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
index bfb2f499..ac1e6943 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonDeserializer.java
@@ -57,7 +57,8 @@ public class EventLogRecordJsonDeserializer extends
JsonDeserializer<EventLogRec
throw new IOException("Missing 'timestamp' field in EventLogRecord
JSON");
}
- // Deserialize event as base Event
+ // Deserialize event as base Event. Any top-level "logLevel" field
present in older log
+ // files is silently ignored — it is no longer part of the record.
JsonNode eventNode = rootNode.get("event");
if (eventNode == null) {
throw new IOException("Missing 'event' field in EventLogRecord
JSON");
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
index e453aa6e..0a035603 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/EventLogRecordJsonSerializer.java
@@ -38,6 +38,7 @@ import java.util.Map;
*
* <ul>
* <li>Top-level timestamp
+ * <li>Top-level eventType (routing key, mirrors {@code event.eventType})
* <li>Event data serialized as a standard JSON object
* </ul>
*
@@ -46,6 +47,7 @@ import java.util.Map;
* <pre>{@code
* {
* "timestamp": "2024-01-15T10:30:00Z",
+ * "eventType": "_input_event",
* "event": {
* "eventType": "_input_event",
* // Event fields serialized normally
@@ -66,6 +68,7 @@ public class EventLogRecordJsonSerializer extends
JsonSerializer<EventLogRecord>
gen.writeStartObject();
gen.writeStringField("timestamp", record.getContext().getTimestamp());
+ gen.writeStringField("eventType", record.getEvent().getType());
gen.writeFieldName("event");
JsonNode eventNode = buildEventNode(record.getEvent(), mapper);
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
index 8e781727..7b42eb7a 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/FileEventLogger.java
@@ -18,13 +18,17 @@
package org.apache.flink.agents.runtime.eventlog;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventContext;
-import org.apache.flink.agents.api.EventFilter;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
import org.apache.flink.agents.api.logger.EventLogger;
import org.apache.flink.agents.api.logger.EventLoggerConfig;
import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.metrics.Counter;
import java.io.BufferedWriter;
import java.io.FileWriter;
@@ -32,6 +36,8 @@ import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.Map;
/**
* A file-based event logger that logs events to files with structured names
in a flat directory.
@@ -81,16 +87,20 @@ public class FileEventLogger implements EventLogger {
private static final String DEFAULT_BASE_LOG_DIR =
Paths.get(System.getProperty("java.io.tmpdir"),
"flink-agents").toString();
+ /** Property key for passing the full agent config data map into the
logger. */
+ public static final String AGENT_CONFIG_PROPERTY_KEY = "agentConfig";
+
private static final ObjectMapper MAPPER = new ObjectMapper();
private final EventLoggerConfig config;
- private final EventFilter eventFilter;
private boolean prettyPrint;
private PrintWriter writer;
+ private EventLogLevelResolver levelResolver;
+ private JsonTruncator truncator;
+ private Counter truncatedEventsCounter;
public FileEventLogger(EventLoggerConfig config) {
this.config = config;
- this.eventFilter = config.getEventFilter();
}
@Override
@@ -105,6 +115,45 @@ public class FileEventLogger implements EventLogger {
writer = new PrintWriter(new BufferedWriter(new
FileWriter(logFilePath, true)));
prettyPrint =
(Boolean)
config.getProperties().getOrDefault(PRETTY_PRINT_PROPERTY_KEY, false);
+
+ // Initialize level resolver and truncator from agent config
+ @SuppressWarnings("unchecked")
+ Map<String, Object> agentConfig =
+ (Map<String, Object>)
+ config.getProperties()
+ .getOrDefault(AGENT_CONFIG_PROPERTY_KEY,
Collections.emptyMap());
+ this.levelResolver = new EventLogLevelResolver(agentConfig);
+ int maxStringLength =
+ getIntFromConfig(
+ agentConfig,
+
AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getKey(),
+
AgentConfigOptions.EVENT_LOG_MAX_STRING_LENGTH.getDefaultValue());
+ int maxArrayElements =
+ getIntFromConfig(
+ agentConfig,
+
AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getKey(),
+
AgentConfigOptions.EVENT_LOG_MAX_ARRAY_ELEMENTS.getDefaultValue());
+ int maxDepth =
+ getIntFromConfig(
+ agentConfig,
+ AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getKey(),
+
AgentConfigOptions.EVENT_LOG_MAX_DEPTH.getDefaultValue());
+ this.truncator = new JsonTruncator(maxStringLength, maxArrayElements,
maxDepth);
+ }
+
+ private static int getIntFromConfig(Map<String, Object> config, String
key, int defaultValue) {
+ Object value = config.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).intValue();
+ }
+ try {
+ return Integer.parseInt(value.toString());
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
}
private String generateSubTaskLogFilePath(EventLoggerOpenParams params) {
@@ -130,18 +179,49 @@ public class FileEventLogger implements EventLogger {
throw new IllegalStateException("FileEventLogger not initialized.
Call open() first.");
}
- // Apply event filter
- if (!eventFilter.accept(event, context)) {
- return; // Skip this event
+ // Resolve log level and skip OFF events.
+ EventLogLevel level =
+ levelResolver != null
+ ? levelResolver.resolve(event.getType())
+ : EventLogLevel.VERBOSE;
+ if (level == EventLogLevel.OFF) {
+ return;
}
+ // All events should be JSON serializable; we already check this when
sending events
+ // to context (RunnerContextImpl.sendEvent).
EventLogRecord record = new EventLogRecord(context, event);
- // All events should be JSON serializable, since we check it when
sending events to context:
- // RunnerContextImpl.sendEvent
+ JsonNode tree = MAPPER.valueToTree(record);
+ if (!(tree instanceof ObjectNode)) {
+ throw new IllegalStateException(
+ "EventLogRecord must serialize to a JSON object, but was: "
+ + tree.getNodeType());
+ }
+ ObjectNode rootNode = (ObjectNode) tree;
+
+ // Truncate the event subtree at STANDARD level.
+ if (level == EventLogLevel.STANDARD && truncator != null) {
+ JsonNode eventNode = rootNode.get("event");
+ if (eventNode instanceof ObjectNode) {
+ boolean truncated = truncator.truncate((ObjectNode) eventNode);
+ if (truncated && truncatedEventsCounter != null) {
+ truncatedEventsCounter.inc();
+ }
+ }
+ }
+
+ // Rebuild the top-level object so logLevel sits between timestamp and
eventType,
+ // matching the documented JSON layout.
+ ObjectNode ordered = MAPPER.createObjectNode();
+ ordered.set("timestamp", rootNode.get("timestamp"));
+ ordered.put("logLevel", level.name());
+ ordered.set("eventType", rootNode.get("eventType"));
+ ordered.set("event", rootNode.get("event"));
+
String json =
prettyPrint
- ?
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(record)
- : MAPPER.writeValueAsString(record);
+ ?
MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(ordered)
+ : MAPPER.writeValueAsString(ordered);
writer.println(json);
}
@@ -154,6 +234,16 @@ public class FileEventLogger implements EventLogger {
writer.flush();
}
+ /**
+ * Sets the counter for tracking truncated events. Called by the operator
after metrics are
+ * initialized.
+ *
+ * @param counter the counter to increment when events are truncated
+ */
+ public void setTruncatedEventsCounter(Counter counter) {
+ this.truncatedEventsCounter = counter;
+ }
+
@Override
public void close() throws Exception {
if (writer != null) {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java
new file mode 100644
index 00000000..89e49cee
--- /dev/null
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/JsonTruncator.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Truncates a Jackson {@link JsonNode} tree per configurable thresholds.
+ *
+ * <p>Three truncation strategies are applied in a single recursive pass:
+ *
+ * <ul>
+ * <li><b>String truncation</b>: Strings longer than {@code maxStringLength}
are replaced with a
+ * wrapper: {@code {"truncatedString": "first N chars...",
"omittedChars": M}}
+ * <li><b>Array truncation</b>: Arrays larger than {@code maxArrayElements}
are replaced with a
+ * wrapper: {@code {"truncatedList": [first N elements],
"omittedElements": M}}
+ * <li><b>Depth truncation</b>: At max depth, object nodes retain only
scalar fields; nested
+ * objects/arrays are dropped: {@code {"truncatedObject": {scalars...},
"omittedFields": N}}
+ * </ul>
+ *
+ * <p>Setting any threshold to {@code 0} disables that specific truncation
strategy. If all
+ * thresholds are {@code 0}, no truncation occurs.
+ *
+ * <p>Protected fields at the top level of the event node ({@code eventType},
{@code id}, {@code
+ * attributes}) are never truncated as structural fields. The {@code
attributes} envelope is
+ * additionally traversed so user payload stored inside it is truncated like
any other field.
+ */
+public class JsonTruncator {
+
+ private static final Set<String> PROTECTED_FIELDS =
+ new HashSet<>(Arrays.asList("eventType", "id", "attributes"));
+
+ private final int maxStringLength;
+ private final int maxArrayElements;
+ private final int maxDepth;
+
+ /**
+ * Creates a new truncator with the given thresholds.
+ *
+ * @param maxStringLength maximum character length for string values; 0 to
disable
+ * @param maxArrayElements maximum number of array elements retained; 0 to
disable
+ * @param maxDepth maximum object nesting depth; 0 to disable
+ */
+ public JsonTruncator(int maxStringLength, int maxArrayElements, int
maxDepth) {
+ this.maxStringLength = maxStringLength;
+ this.maxArrayElements = maxArrayElements;
+ this.maxDepth = maxDepth;
+ }
+
+ /**
+ * Truncates the given event node in place according to configured
thresholds.
+ *
+ * <p>Protected fields ({@code eventType}, {@code id}, {@code attributes})
at the top level of
+ * the event node are never truncated as structural fields. The {@code
attributes} envelope is
+ * additionally traversed so user payload stored inside it is truncated
like any other field.
+ *
+ * @param eventNode the top-level event JSON object to truncate
+ * @return {@code true} if any field was truncated, {@code false} if the
node was unchanged
+ */
+ public boolean truncate(ObjectNode eventNode) {
+ if (eventNode == null) {
+ return false;
+ }
+ boolean truncated = truncateObject(eventNode, 1, true);
+ // Traverse the protected attributes envelope so its user payload
still gets truncated.
+ JsonNode attributes = eventNode.get("attributes");
+ if (attributes instanceof ObjectNode) {
+ truncated |= truncateObject((ObjectNode) attributes, 1, false);
+ }
+ return truncated;
+ }
+
+ /**
+ * Recursively truncates an object node.
+ *
+ * @param node the object node to process
+ * @param depth current depth (1 = top-level event node)
+ * @param isTopLevel whether this is the top-level event node (for
protected field checks)
+ * @return true if any truncation occurred
+ */
+ private boolean truncateObject(ObjectNode node, int depth, boolean
isTopLevel) {
+ boolean truncated = false;
+
+ // At max depth, collapse the entire object to retain only scalars
+ if (maxDepth > 0 && depth >= maxDepth) {
+ return collapseAtMaxDepth(node, isTopLevel);
+ }
+
+ List<String> fieldNames = new ArrayList<>();
+ node.fieldNames().forEachRemaining(fieldNames::add);
+
+ for (String fieldName : fieldNames) {
+ if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+ continue;
+ }
+
+ JsonNode child = node.get(fieldName);
+ if (child == null) {
+ continue;
+ }
+
+ if (child.isTextual()) {
+ JsonNode replacement = truncateString(child.textValue());
+ if (replacement != null) {
+ node.set(fieldName, replacement);
+ truncated = true;
+ }
+ } else if (child.isArray()) {
+ // Truncate the array first so we don't recurse into elements
that will be dropped.
+ JsonNode replacement = truncateArray((ArrayNode) child);
+ if (replacement != null) {
+ ArrayNode retained =
+ (ArrayNode) ((ObjectNode)
replacement).get("truncatedList");
+ truncateArrayContents(retained, depth + 1);
+ node.set(fieldName, replacement);
+ truncated = true;
+ } else {
+ truncated |= truncateArrayContents((ArrayNode) child,
depth + 1);
+ }
+ } else if (child.isObject()) {
+ truncated |= truncateObject((ObjectNode) child, depth + 1,
false);
+ }
+ }
+
+ return truncated;
+ }
+
+ /**
+ * Truncates a string if it exceeds maxStringLength.
+ *
+ * @return a wrapper ObjectNode if truncated, or null if no truncation
needed
+ */
+ private JsonNode truncateString(String value) {
+ if (maxStringLength <= 0 || value == null || value.length() <=
maxStringLength) {
+ return null;
+ }
+ ObjectNode wrapper = JsonNodeFactory.instance.objectNode();
+ wrapper.put("truncatedString", value.substring(0, maxStringLength) +
"...");
+ wrapper.put("omittedChars", value.length() - maxStringLength);
+ return wrapper;
+ }
+
+ /**
+ * Truncates an array if it exceeds maxArrayElements.
+ *
+ * @return a wrapper ObjectNode if truncated, or null if no truncation
needed
+ */
+ private JsonNode truncateArray(ArrayNode array) {
+ if (maxArrayElements <= 0 || array.size() <= maxArrayElements) {
+ return null;
+ }
+ ObjectNode wrapper = JsonNodeFactory.instance.objectNode();
+ ArrayNode retained = JsonNodeFactory.instance.arrayNode();
+ for (int i = 0; i < maxArrayElements; i++) {
+ retained.add(array.get(i));
+ }
+ wrapper.set("truncatedList", retained);
+ wrapper.put("omittedElements", array.size() - maxArrayElements);
+ return wrapper;
+ }
+
+ /**
+ * Recursively truncates contents within an array (strings and nested
structures).
+ *
+ * @return true if any truncation occurred within array elements
+ */
+ private boolean truncateArrayContents(ArrayNode array, int depth) {
+ boolean truncated = false;
+ for (int i = 0; i < array.size(); i++) {
+ JsonNode element = array.get(i);
+ if (element.isTextual()) {
+ JsonNode replacement = truncateString(element.textValue());
+ if (replacement != null) {
+ array.set(i, replacement);
+ truncated = true;
+ }
+ } else if (element.isObject()) {
+ truncated |= truncateObject((ObjectNode) element, depth,
false);
+ } else if (element.isArray()) {
+ JsonNode replacement = truncateArray((ArrayNode) element);
+ if (replacement != null) {
+ ArrayNode retained =
+ (ArrayNode) ((ObjectNode)
replacement).get("truncatedList");
+ truncateArrayContents(retained, depth + 1);
+ array.set(i, replacement);
+ truncated = true;
+ } else {
+ truncated |= truncateArrayContents((ArrayNode) element,
depth + 1);
+ }
+ }
+ }
+ return truncated;
+ }
+
+ /**
+ * At max depth, retain only scalar fields and drop nested objects/arrays.
+ *
+ * @return true if any fields were dropped
+ */
+ private boolean collapseAtMaxDepth(ObjectNode node, boolean isTopLevel) {
+ List<String> fieldNames = new ArrayList<>();
+ node.fieldNames().forEachRemaining(fieldNames::add);
+
+ ObjectNode scalarFields = JsonNodeFactory.instance.objectNode();
+ int omittedCount = 0;
+ boolean hasNonScalar = false;
+
+ for (String fieldName : fieldNames) {
+ if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+ // Protected fields are kept as-is, even if non-scalar
+ scalarFields.set(fieldName, node.get(fieldName));
+ continue;
+ }
+
+ JsonNode child = node.get(fieldName);
+ if (child.isObject() || child.isArray()) {
+ omittedCount++;
+ hasNonScalar = true;
+ } else {
+ // Apply string truncation to scalar string fields even at max
depth
+ if (child.isTextual()) {
+ JsonNode replacement = truncateString(child.textValue());
+ if (replacement != null) {
+ scalarFields.set(fieldName, replacement);
+ } else {
+ scalarFields.set(fieldName, child);
+ }
+ } else {
+ scalarFields.set(fieldName, child);
+ }
+ }
+ }
+
+ if (!hasNonScalar) {
+ // No non-scalar fields to drop; but string truncation may have
occurred
+ // Check if any scalar field was replaced
+ boolean stringTruncated = false;
+ for (String fieldName : fieldNames) {
+ if (isTopLevel && PROTECTED_FIELDS.contains(fieldName)) {
+ continue;
+ }
+ JsonNode original = node.get(fieldName);
+ if (original.isTextual()) {
+ JsonNode replacement =
truncateString(original.textValue());
+ if (replacement != null) {
+ node.set(fieldName, replacement);
+ stringTruncated = true;
+ }
+ }
+ }
+ return stringTruncated;
+ }
+
+ // Replace the node contents with the wrapped version
+ node.removeAll();
+ node.set("truncatedObject", scalarFields);
+ node.put("omittedFields", omittedCount);
+ return true;
+ }
+}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
index 0e965732..59e0daaa 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
@@ -37,6 +37,8 @@ public class BuiltInMetrics {
private final Meter numOfActionsExecutedPerSec;
+ private final Counter eventLogTruncatedEvents;
+
private final HashMap<String, BuiltInActionMetrics> actionMetricGroups;
public BuiltInMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup,
AgentPlan agentPlan) {
@@ -48,6 +50,8 @@ public class BuiltInMetrics {
this.numOfActionsExecutedPerSec =
parentMetricGroup.getMeter("numOfActionsExecutedPerSec",
numOfActionsExecuted);
+ this.eventLogTruncatedEvents =
parentMetricGroup.getCounter("eventLogTruncatedEvents");
+
this.actionMetricGroups = new HashMap<>();
for (String actionName : agentPlan.getActions().keySet()) {
actionMetricGroups.put(
@@ -69,4 +73,9 @@ public class BuiltInMetrics {
numOfActionsExecutedPerSec.markEvent();
actionMetricGroups.get(actionName).markActionExecuted();
}
+
+ /** Returns the counter tracking event log truncation occurrences. */
+ public Counter getEventLogTruncatedEventsCounter() {
+ return eventLogTruncatedEvents;
+ }
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
index cce19585..a79f227d 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/EventRouter.java
@@ -111,6 +111,10 @@ class EventRouter<IN, OUT> implements AutoCloseable {
return;
}
eventLogger.open(new EventLoggerOpenParams(runtimeContext));
+ if (eventLogger instanceof FileEventLogger) {
+ ((FileEventLogger) eventLogger)
+
.setTruncatedEventsCounter(builtInMetrics.getEventLogTruncatedEventsCounter());
+ }
}
/**
@@ -241,6 +245,8 @@ class EventRouter<IN, OUT> implements AutoCloseable {
}
loggerConfigBuilder.property(
FileEventLogger.PRETTY_PRINT_PROPERTY_KEY,
agentPlan.getConfig().get(PRETTY_PRINT));
+ loggerConfigBuilder.property(
+ FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentPlan.getConfig().getConfData());
return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java
new file mode 100644
index 00000000..a1d2d26e
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/EventLogLevelResolverTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class EventLogLevelResolverTest {
+
+ @Test
+ void testExactMatch() {
+ Map<String, Object> config = new HashMap<>();
+ config.put(
+
"event-log.type.org.apache.flink.agents.api.event.ChatRequestEvent.level",
"OFF");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+ .isEqualTo(EventLogLevel.OFF);
+ }
+
+ @Test
+ void testParentPackageInheritance() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("event-log.type.org.apache.flink.agents.api.event.level",
"VERBOSE");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ // ChatRequestEvent is under the configured parent package
+
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+ .isEqualTo(EventLogLevel.VERBOSE);
+ }
+
+ @Test
+ void testGrandparentInheritance() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("event-log.type.org.apache.flink.level", "OFF");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ // Should walk up multiple levels to find the match
+
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+ .isEqualTo(EventLogLevel.OFF);
+ }
+
+ @Test
+ void testRootDefault() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("event-log.level", "VERBOSE");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+
assertThat(resolver.resolve("some.unknown.EventType")).isEqualTo(EventLogLevel.VERBOSE);
+ }
+
+ @Test
+ void testBuiltInDefault() {
+ EventLogLevelResolver resolver = new
EventLogLevelResolver(Collections.emptyMap());
+
+
assertThat(resolver.resolve("some.unknown.EventType")).isEqualTo(EventLogLevel.STANDARD);
+ }
+
+ @Test
+ void testCaseInsensitive() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("event-log.type.my.Event.level", "verbose");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+
assertThat(resolver.resolve("my.Event")).isEqualTo(EventLogLevel.VERBOSE);
+
+ // Also test root level with mixed case
+ Map<String, Object> config2 = new HashMap<>();
+ config2.put("event-log.level", "Off");
+ EventLogLevelResolver resolver2 = new EventLogLevelResolver(config2);
+
+
assertThat(resolver2.resolve("any.Event")).isEqualTo(EventLogLevel.OFF);
+ }
+
+ @Test
+ void testInvalidLevel() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("event-log.type.my.Event.level", "INVALID_LEVEL");
+
+ assertThatThrownBy(() -> new EventLogLevelResolver(config))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("INVALID_LEVEL");
+ }
+
+ @Test
+ void testNullConfigData() {
+ EventLogLevelResolver resolver = new EventLogLevelResolver(null);
+
+
assertThat(resolver.resolve("some.Event")).isEqualTo(EventLogLevel.STANDARD);
+ }
+
+ @Test
+ void testNullOrEmptyEventType() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("event-log.level", "VERBOSE");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ assertThat(resolver.resolve(null)).isEqualTo(EventLogLevel.VERBOSE);
+ assertThat(resolver.resolve("")).isEqualTo(EventLogLevel.VERBOSE);
+ }
+
+ @Test
+ void testExactMatchTakesPrecedenceOverParent() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("event-log.type.org.apache.flink.agents.api.event.level",
"OFF");
+ config.put(
+
"event-log.type.org.apache.flink.agents.api.event.ChatRequestEvent.level",
+ "VERBOSE");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ // Exact match should win over parent package
+
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatRequestEvent"))
+ .isEqualTo(EventLogLevel.VERBOSE);
+ // Sibling should inherit from parent
+
assertThat(resolver.resolve("org.apache.flink.agents.api.event.ChatResponseEvent"))
+ .isEqualTo(EventLogLevel.OFF);
+ }
+
+ @Test
+ void testCachingReturnsSameResult() {
+ Map<String, Object> config = new HashMap<>();
+ config.put("event-log.type.my.Event.level", "OFF");
+ EventLogLevelResolver resolver = new EventLogLevelResolver(config);
+
+ // Call twice — should return cached result
+ EventLogLevel first = resolver.resolve("my.Event");
+ EventLogLevel second = resolver.resolve("my.Event");
+ assertThat(first).isSameAs(second);
+ }
+}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
index af2dc794..831fba4f 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/FileEventLoggerTest.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.EventContext;
-import org.apache.flink.agents.api.EventFilter;
import org.apache.flink.agents.api.InputEvent;
import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.configuration.AgentConfigOptions;
@@ -301,237 +300,264 @@ class FileEventLoggerTest {
}
@Test
- void testEventFilterAcceptAll() throws Exception {
- // Given - config with ACCEPT_ALL filter (default behavior)
+ void testPrettyPrintOutputsFormattedJson() throws Exception {
+ // Given - config with prettyPrint enabled
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(EventFilter.ACCEPT_ALL)
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
+ .property(AgentConfigOptions.PRETTY_PRINT.getKey(),
true)
.build();
logger = new FileEventLogger(config);
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
-
- // When
+ InputEvent inputEvent = new InputEvent("test input");
logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
logger.flush();
- // Then - both events should be logged
+ // Then - output should be valid JSON spanning multiple lines
(pretty-printed)
Path logFile = getExpectedLogFilePath();
List<String> lines = Files.readAllLines(logFile);
- assertEquals(2, lines.size(), "Both events should be logged with
ACCEPT_ALL filter");
-
- // Verify both events were deserialized correctly
- EventLogRecord inputRecord = objectMapper.readValue(lines.get(0),
EventLogRecord.class);
- assertEquals(InputEvent.EVENT_TYPE, inputRecord.getEvent().getType());
-
- EventLogRecord outputRecord = objectMapper.readValue(lines.get(1),
EventLogRecord.class);
- assertEquals(OutputEvent.EVENT_TYPE,
outputRecord.getEvent().getType());
+ // Pretty-printed JSON for a single event record spans multiple lines
+ assertTrue(lines.size() > 1, "Pretty-printed JSON should span multiple
lines");
+ // Each line after the first should be indented
+ assertTrue(
+ lines.subList(1, lines.size()).stream().anyMatch(line ->
line.startsWith(" ")),
+ "Pretty-printed JSON lines should be indented");
+ // The entire content should still be valid JSON
+ String content = String.join("\n", lines);
+ assertDoesNotThrow(
+ () -> objectMapper.readValue(content, EventLogRecord.class),
+ "Pretty-printed output should be valid JSON deserializable to
EventLogRecord");
}
@Test
- void testEventFilterRejectAll() throws Exception {
- // Given - config with REJECT_ALL filter
+ void testStandardLevelTruncation() throws Exception {
+ // Given - config with STANDARD level and a small max-string-length
for easy testing
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "STANDARD");
+ agentConfig.put("event-log.standard.max-string-length", 10);
+ agentConfig.put("event-log.standard.max-array-elements", 20);
+ agentConfig.put("event-log.standard.max-depth", 5);
+
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(EventFilter.REJECT_ALL)
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
- // When
- logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
+ // Use a custom event with a very long string field
+ TestCustomEvent event =
+ new TestCustomEvent("this is a very long string that exceeds
10", 1);
+ EventContext context = new EventContext(event);
+
+ logger.append(context, event);
logger.flush();
- // Then - no events should be logged (file should not exist or be
empty)
Path logFile = getExpectedLogFilePath();
+ List<String> lines = Files.readAllLines(logFile);
+ assertEquals(1, lines.size());
+
+ JsonNode jsonNode = objectMapper.readTree(lines.get(0));
+ assertEquals("STANDARD", jsonNode.get("logLevel").asText());
+
+ // The customData field (inside attributes) should be truncated
+ JsonNode attrsNode = jsonNode.get("event").get("attributes");
+ JsonNode customDataNode = attrsNode.get("customData");
assertTrue(
- !Files.exists(logFile) ||
Files.readAllLines(logFile).isEmpty(),
- "No events should be logged with REJECT_ALL filter");
+ customDataNode.has("truncatedString"),
+ "Long string should be truncated at STANDARD level");
+ assertTrue(customDataNode.has("omittedChars"));
}
@Test
- void testEventFilterByEventType() throws Exception {
- // Given - config with filter that only accepts InputEvents
+ void testVerboseLevelNoTruncation() throws Exception {
+ // Given - config with VERBOSE level
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "VERBOSE");
+ agentConfig.put("event-log.standard.max-string-length", 10);
+
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(EventFilter.byEventType(InputEvent.class))
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
- TestCustomEvent customEvent = new TestCustomEvent("custom data", 42);
- // When
- logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
- logger.append(new EventContext(customEvent), customEvent);
+ TestCustomEvent event =
+ new TestCustomEvent("this is a very long string that exceeds
10", 1);
+ EventContext context = new EventContext(event);
+
+ logger.append(context, event);
logger.flush();
- // Then - only InputEvent should be logged
Path logFile = getExpectedLogFilePath();
List<String> lines = Files.readAllLines(logFile);
- assertEquals(1, lines.size(), "Only InputEvent should be logged");
+ assertEquals(1, lines.size());
- EventLogRecord record = objectMapper.readValue(lines.get(0),
EventLogRecord.class);
- assertEquals(InputEvent.EVENT_TYPE, record.getEvent().getType());
- assertEquals("input data",
InputEvent.fromEvent(record.getEvent()).getInput());
+ JsonNode jsonNode = objectMapper.readTree(lines.get(0));
+ assertEquals("VERBOSE", jsonNode.get("logLevel").asText());
+
+ // The customData field (inside attributes) should NOT be truncated
+ JsonNode attrsNode = jsonNode.get("event").get("attributes");
+ assertTrue(
+ attrsNode.get("customData").isTextual(),
+ "String should be preserved at VERBOSE level");
+ assertEquals(
+ "this is a very long string that exceeds 10",
attrsNode.get("customData").asText());
}
@Test
- void testEventFilterByMultipleEventTypes() throws Exception {
- // Given - config with filter that accepts InputEvents and OutputEvents
+ void testOffLevelSkipsEvent() throws Exception {
+ // Given - config with OFF level
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "OFF");
+
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(EventFilter.byEventType(InputEvent.class,
OutputEvent.class))
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
- TestCustomEvent customEvent = new TestCustomEvent("custom data", 42);
- // When
- logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
- logger.append(new EventContext(customEvent), customEvent);
+ InputEvent event = new InputEvent("should not be logged");
+ EventContext context = new EventContext(event);
+
+ logger.append(context, event);
logger.flush();
- // Then - InputEvent and OutputEvent should be logged, but not
TestCustomEvent
Path logFile = getExpectedLogFilePath();
List<String> lines = Files.readAllLines(logFile);
- assertEquals(2, lines.size(), "InputEvent and OutputEvent should be
logged");
-
- EventLogRecord inputRecord = objectMapper.readValue(lines.get(0),
EventLogRecord.class);
- assertEquals(InputEvent.EVENT_TYPE, inputRecord.getEvent().getType());
- assertEquals("input data",
InputEvent.fromEvent(inputRecord.getEvent()).getInput());
-
- EventLogRecord outputRecord = objectMapper.readValue(lines.get(1),
EventLogRecord.class);
- assertEquals(OutputEvent.EVENT_TYPE,
outputRecord.getEvent().getType());
- assertEquals("output data",
OutputEvent.fromEvent(outputRecord.getEvent()).getOutput());
+ assertEquals(0, lines.size(), "OFF level should produce no output");
}
@Test
- void testCustomEventFilter() throws Exception {
- // Given - config with custom filter that only accepts events with
specific content
- EventFilter customFilter =
- (event, context) -> {
- if (InputEvent.EVENT_TYPE.equals(event.getType())) {
- return InputEvent.fromEvent(event)
- .getInput()
- .toString()
- .contains("important");
- }
- return false;
- };
+ void testPerTypeLevelOverride() throws Exception {
+ // Given - root is STANDARD but InputEvent is set to VERBOSE
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "STANDARD");
+ agentConfig.put("event-log.standard.max-string-length", 10);
+ agentConfig.put("event-log.type." + InputEvent.EVENT_TYPE + ".level",
"VERBOSE");
config =
EventLoggerConfig.builder()
.loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .eventFilter(customFilter)
+ .property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent importantEvent = new InputEvent("important data");
- InputEvent regularEvent = new InputEvent("regular data");
- OutputEvent outputEvent = new OutputEvent("output data");
- // When
- logger.append(new EventContext(importantEvent), importantEvent);
- logger.append(new EventContext(regularEvent), regularEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
+ // InputEvent should be VERBOSE (no truncation)
+ InputEvent inputEvent = new InputEvent("this is a very long string
that exceeds 10");
+ logger.append(new EventContext(inputEvent), inputEvent);
+
+ // TestCustomEvent should be STANDARD (truncated)
+ TestCustomEvent customEvent =
+ new TestCustomEvent("this is a very long string that exceeds
10", 1);
+ logger.append(new EventContext(customEvent), customEvent);
logger.flush();
- // Then - only the "important" InputEvent should be logged
Path logFile = getExpectedLogFilePath();
List<String> lines = Files.readAllLines(logFile);
- assertEquals(1, lines.size(), "Only important InputEvent should be
logged");
+ assertEquals(2, lines.size());
- EventLogRecord record = objectMapper.readValue(lines.get(0),
EventLogRecord.class);
- assertEquals(InputEvent.EVENT_TYPE, record.getEvent().getType());
- assertEquals("important data",
InputEvent.fromEvent(record.getEvent()).getInput());
+ // InputEvent at VERBOSE - no truncation (data lives in attributes)
+ JsonNode inputJson = objectMapper.readTree(lines.get(0));
+ assertEquals("VERBOSE", inputJson.get("logLevel").asText());
+
assertTrue(inputJson.get("event").get("attributes").get("input").isTextual());
+
+ // TestCustomEvent at STANDARD - truncated (data lives in attributes)
+ JsonNode customJson = objectMapper.readTree(lines.get(1));
+ assertEquals("STANDARD", customJson.get("logLevel").asText());
+ assertTrue(
+
customJson.get("event").get("attributes").get("customData").has("truncatedString"));
}
@Test
- void testDefaultEventFilterBehavior() throws Exception {
- // Given - config without explicit eventFilter (should default to
ACCEPT_ALL)
- config =
- EventLoggerConfig.builder()
- .loggerType("file")
- .property("baseLogDir", tempDir.toString())
- .build();
- logger = new FileEventLogger(config);
-
+ void testJsonOutputHasNewFields() throws Exception {
+ // Given - default config
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("input data");
- OutputEvent outputEvent = new OutputEvent("output data");
+ InputEvent event = new InputEvent("test");
+ EventContext context = new EventContext(event);
- // When
- logger.append(new EventContext(inputEvent), inputEvent);
- logger.append(new EventContext(outputEvent), outputEvent);
+ logger.append(context, event);
logger.flush();
- // Then - both events should be logged (default ACCEPT_ALL behavior)
Path logFile = getExpectedLogFilePath();
List<String> lines = Files.readAllLines(logFile);
- assertEquals(2, lines.size(), "Both events should be logged with
default filter");
+ JsonNode jsonNode = objectMapper.readTree(lines.get(0));
- EventLogRecord inputRecord = objectMapper.readValue(lines.get(0),
EventLogRecord.class);
- assertEquals(InputEvent.EVENT_TYPE, inputRecord.getEvent().getType());
+ // Verify new top-level fields exist
+ assertTrue(jsonNode.has("logLevel"), "JSON should have logLevel
field");
+ assertTrue(jsonNode.has("eventType"), "JSON should have eventType
field");
+ assertEquals(InputEvent.EVENT_TYPE,
jsonNode.get("eventType").asText());
+ assertNotNull(jsonNode.get("logLevel").asText());
+ }
- EventLogRecord outputRecord = objectMapper.readValue(lines.get(1),
EventLogRecord.class);
- assertEquals(OutputEvent.EVENT_TYPE,
outputRecord.getEvent().getType());
+ @Test
+ void testBackwardCompatibleDeserialization() throws Exception {
+ // Simulate old-format JSON without a top-level logLevel field. The
Event payload uses
+ // the post-#631 shape: {type, id, attributes}. The deserializer must
still parse it.
+ String oldFormatJson =
+ "{\"timestamp\":\"2024-01-15T10:30:00Z\","
+ + "\"event\":{\"eventType\":\""
+ + InputEvent.EVENT_TYPE
+ + "\",\"type\":\""
+ + InputEvent.EVENT_TYPE
+ + "\","
+ + "\"attributes\":{\"input\":\"test\"}}}";
+
+ EventLogRecord record = objectMapper.readValue(oldFormatJson,
EventLogRecord.class);
+ assertNotNull(record.getEvent());
+ assertEquals(InputEvent.EVENT_TYPE, record.getEvent().getType());
+ assertEquals(
+ "test",
+ InputEvent.fromEvent(record.getEvent()).getInput(),
+ "Old-format JSON without logLevel should still deserialize the
event payload");
}
@Test
- void testPrettyPrintOutputsFormattedJson() throws Exception {
- // Given - config with prettyPrint enabled
+ void testHierarchicalInheritance() throws Exception {
+ // Set namespace-level OFF, but specific type VERBOSE. Uses custom
dotted event types
+ // because built-in event types (e.g., "_input_event") have no
dot-separated parents.
+ Map<String, Object> agentConfig = new HashMap<>();
+ agentConfig.put("event-log.level", "STANDARD");
+ agentConfig.put("event-log.type.com.example.events.level", "OFF");
+ agentConfig.put("event-log.type." + TestNamespacedEventA.EVENT_TYPE +
".level", "VERBOSE");
+
config =
EventLoggerConfig.builder()
.loggerType("file")
.property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY,
tempDir.toString())
- .property(AgentConfigOptions.PRETTY_PRINT.getKey(),
true)
+ .property(FileEventLogger.AGENT_CONFIG_PROPERTY_KEY,
agentConfig)
.build();
logger = new FileEventLogger(config);
-
logger.open(openParams);
- InputEvent inputEvent = new InputEvent("test input");
- logger.append(new EventContext(inputEvent), inputEvent);
+
+ // EventA has explicit VERBOSE override — should be logged
+ TestNamespacedEventA eventA = new TestNamespacedEventA("should be
logged");
+ logger.append(new EventContext(eventA), eventA);
+
+ // EventB inherits OFF from namespace level — should NOT be logged
+ TestNamespacedEventB eventB = new TestNamespacedEventB("should not be
logged");
+ logger.append(new EventContext(eventB), eventB);
logger.flush();
- // Then - output should be valid JSON spanning multiple lines
(pretty-printed)
Path logFile = getExpectedLogFilePath();
List<String> lines = Files.readAllLines(logFile);
- // Pretty-printed JSON for a single event record spans multiple lines
- assertTrue(lines.size() > 1, "Pretty-printed JSON should span multiple
lines");
- // Each line after the first should be indented
- assertTrue(
- lines.subList(1, lines.size()).stream().anyMatch(line ->
line.startsWith(" ")),
- "Pretty-printed JSON lines should be indented");
- // The entire content should still be valid JSON
- String content = String.join("\n", lines);
- assertDoesNotThrow(
- () -> objectMapper.readValue(content, EventLogRecord.class),
- "Pretty-printed output should be valid JSON deserializable to
EventLogRecord");
+ assertEquals(1, lines.size(), "Only EventA (VERBOSE override) should
be logged");
+
+ JsonNode json = objectMapper.readTree(lines.get(0));
+ assertEquals("VERBOSE", json.get("logLevel").asText());
+ assertEquals(TestNamespacedEventA.EVENT_TYPE,
json.get("eventType").asText());
}
private Path getExpectedLogFilePath() {
@@ -573,4 +599,24 @@ class FileEventLoggerTest {
return ((Number) getAttr("customNumber")).intValue();
}
}
+
+ /** Custom event with a dot-separated type to exercise hierarchical level
inheritance. */
+ public static class TestNamespacedEventA extends Event {
+ public static final String EVENT_TYPE = "com.example.events.A";
+
+ public TestNamespacedEventA(String payload) {
+ super(EVENT_TYPE);
+ setAttr("payload", payload);
+ }
+ }
+
+ /** Custom event sharing EventA's namespace, used to verify
namespace-level inheritance. */
+ public static class TestNamespacedEventB extends Event {
+ public static final String EVENT_TYPE = "com.example.events.B";
+
+ public TestNamespacedEventB(String payload) {
+ super(EVENT_TYPE);
+ setAttr("payload", payload);
+ }
+ }
}
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java
new file mode 100644
index 00000000..bd05b9a1
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/eventlog/JsonTruncatorTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class JsonTruncatorTest {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ @Test
+ void testStringTruncation() {
+ JsonTruncator truncator = new JsonTruncator(10, 0, 0);
+ ObjectNode node = MAPPER.createObjectNode();
+ node.put("content", "This is a very long string that exceeds the
limit");
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ assertThat(node.get("content").isObject()).isTrue();
+
assertThat(node.get("content").get("truncatedString").asText()).isEqualTo("This
is a ...");
+ assertThat(node.get("content").get("omittedChars").asInt())
+ .isEqualTo(49 - 10); // total length minus maxStringLength
+ }
+
+ @Test
+ void testArrayTrimming() {
+ JsonTruncator truncator = new JsonTruncator(0, 3, 0);
+ ObjectNode node = MAPPER.createObjectNode();
+ ArrayNode array = MAPPER.createArrayNode();
+ for (int i = 0; i < 7; i++) {
+ array.add("item" + i);
+ }
+ node.set("items", array);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ assertThat(node.get("items").isObject()).isTrue();
+ assertThat(node.get("items").get("truncatedList").size()).isEqualTo(3);
+
assertThat(node.get("items").get("truncatedList").get(0).asText()).isEqualTo("item0");
+
assertThat(node.get("items").get("truncatedList").get(1).asText()).isEqualTo("item1");
+
assertThat(node.get("items").get("truncatedList").get(2).asText()).isEqualTo("item2");
+
assertThat(node.get("items").get("omittedElements").asInt()).isEqualTo(4);
+ }
+
+ @Test
+ void testDepthCollapsing() {
+ JsonTruncator truncator = new JsonTruncator(0, 0, 2);
+ ObjectNode node = MAPPER.createObjectNode();
+ ObjectNode nested = MAPPER.createObjectNode();
+ nested.put("scalarField", "value");
+ nested.put("numberField", 42);
+ ObjectNode deepNested = MAPPER.createObjectNode();
+ deepNested.put("deep", "data");
+ nested.set("nestedObj", deepNested);
+ ArrayNode nestedArray = MAPPER.createArrayNode();
+ nestedArray.add("a");
+ nested.set("nestedArr", nestedArray);
+ node.set("data", nested);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ // The "data" field's nested object should be collapsed at depth 2
+ assertThat(node.get("data").has("truncatedObject")).isTrue();
+ ObjectNode truncatedObj = (ObjectNode)
node.get("data").get("truncatedObject");
+
assertThat(truncatedObj.get("scalarField").asText()).isEqualTo("value");
+ assertThat(truncatedObj.get("numberField").asInt()).isEqualTo(42);
+ assertThat(truncatedObj.has("nestedObj")).isFalse();
+ assertThat(truncatedObj.has("nestedArr")).isFalse();
+ assertThat(node.get("data").get("omittedFields").asInt()).isEqualTo(2);
+ }
+
+ @Test
+ void testNoTruncationUnderLimits() {
+ JsonTruncator truncator = new JsonTruncator(100, 10, 5);
+ ObjectNode node = MAPPER.createObjectNode();
+ node.put("short", "hello");
+ ArrayNode smallArray = MAPPER.createArrayNode();
+ smallArray.add("a");
+ smallArray.add("b");
+ node.set("list", smallArray);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isFalse();
+ assertThat(node.get("short").asText()).isEqualTo("hello");
+ assertThat(node.get("list").size()).isEqualTo(2);
+ }
+
+ @Test
+ void testDisabledThresholds() {
+ // All thresholds set to 0 — no truncation should occur
+ JsonTruncator truncator = new JsonTruncator(0, 0, 0);
+ ObjectNode node = MAPPER.createObjectNode();
+ node.put("content", "A".repeat(10000));
+ ArrayNode bigArray = MAPPER.createArrayNode();
+ for (int i = 0; i < 100; i++) {
+ bigArray.add(i);
+ }
+ node.set("items", bigArray);
+ ObjectNode deep = MAPPER.createObjectNode();
+ deep.set("level2", MAPPER.createObjectNode().put("level3", "deep"));
+ node.set("nested", deep);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isFalse();
+ assertThat(node.get("content").asText()).hasSize(10000);
+ assertThat(node.get("items").size()).isEqualTo(100);
+ }
+
+ @Test
+ void testProtectedFields() {
+ JsonTruncator truncator = new JsonTruncator(5, 2, 2);
+ ObjectNode node = MAPPER.createObjectNode();
+ // Protected fields should never be truncated
+ node.put("eventType",
"org.apache.flink.agents.api.event.ChatRequestEvent");
+ node.put("id", "a-very-long-identifier-that-exceeds-the-limit");
+ ObjectNode attributes = MAPPER.createObjectNode();
+ attributes.put("key", "value");
+ attributes.set("nested", MAPPER.createObjectNode().put("deep",
"data"));
+ node.set("attributes", attributes);
+ // Non-protected field should be truncated
+ node.put("content", "This should be truncated");
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ // Protected fields remain untouched
+ assertThat(node.get("eventType").asText())
+
.isEqualTo("org.apache.flink.agents.api.event.ChatRequestEvent");
+ assertThat(node.get("id").asText())
+ .isEqualTo("a-very-long-identifier-that-exceeds-the-limit");
+ assertThat(node.get("attributes").isObject()).isTrue();
+
assertThat(node.get("attributes").get("key").asText()).isEqualTo("value");
+ // Non-protected field is truncated
+ assertThat(node.get("content").get("truncatedString")).isNotNull();
+ }
+
+ @Test
+ void testCompositeScenario() {
+ JsonTruncator truncator = new JsonTruncator(10, 2, 3);
+ ObjectNode node = MAPPER.createObjectNode();
+ // Long string — should trigger string truncation
+ node.put("message", "Hello, this is a long message from the LLM");
+ // Large array — should trigger array truncation
+ ArrayNode tools = MAPPER.createArrayNode();
+ for (int i = 0; i < 5; i++) {
+ tools.add("tool" + i);
+ }
+ node.set("tools", tools);
+ // Deep nesting — should trigger depth truncation at level 3
+ ObjectNode level1 = MAPPER.createObjectNode();
+ ObjectNode level2 = MAPPER.createObjectNode();
+ level2.put("scalar", "kept");
+ level2.set("tooDeep", MAPPER.createObjectNode().put("hidden", "gone"));
+ level1.set("inner", level2);
+ node.set("metadata", level1);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ // String truncation applied
+ assertThat(node.get("message").get("truncatedString")).isNotNull();
+
assertThat(node.get("message").get("omittedChars").asInt()).isGreaterThan(0);
+ // Array truncation applied
+ assertThat(node.get("tools").get("truncatedList").size()).isEqualTo(2);
+
assertThat(node.get("tools").get("omittedElements").asInt()).isEqualTo(3);
+ // Depth truncation: level2 is at depth 3, so it should be collapsed
+ ObjectNode metadataInner = (ObjectNode)
node.get("metadata").get("inner");
+ assertThat(metadataInner.has("truncatedObject")).isTrue();
+
assertThat(metadataInner.get("truncatedObject").get("scalar").asText()).isEqualTo("kept");
+ assertThat(metadataInner.get("omittedFields").asInt()).isEqualTo(1);
+ }
+
+ @Test
+ void testRecursionSkipsDroppedTailElements() {
+ // Regression test: when an array exceeds maxArrayElements, the
elements beyond the cap
+ // must NOT be recursed into. Prior to the fix, truncateArrayContents
ran on the full
+ // array before truncateArray dropped the tail, causing wasted
CPU/allocations on
+ // elements that were about to be discarded.
+ JsonTruncator truncator = new JsonTruncator(5, 2, 0);
+ ObjectNode node = MAPPER.createObjectNode();
+ ArrayNode items = MAPPER.createArrayNode();
+ // Two short strings that fit under the retained cap and the string
limit.
+ items.add("a");
+ items.add("b");
+ // Three tail ObjectNodes whose long-string fields would be truncated
if visited.
+ String longValue = "this-string-is-much-longer-than-five-chars";
+ ObjectNode tail0 = MAPPER.createObjectNode();
+ tail0.put("payload", longValue);
+ ObjectNode tail1 = MAPPER.createObjectNode();
+ tail1.put("payload", longValue);
+ ObjectNode tail2 = MAPPER.createObjectNode();
+ tail2.put("payload", longValue);
+ items.add(tail0);
+ items.add(tail1);
+ items.add(tail2);
+ node.set("items", items);
+
+ boolean result = truncator.truncate(node);
+
+ assertThat(result).isTrue();
+ // Retained list is exactly the first 2 elements; 3 are dropped.
+ assertThat(node.get("items").get("truncatedList").size()).isEqualTo(2);
+
assertThat(node.get("items").get("truncatedList").get(0).asText()).isEqualTo("a");
+
assertThat(node.get("items").get("truncatedList").get(1).asText()).isEqualTo("b");
+
assertThat(node.get("items").get("omittedElements").asInt()).isEqualTo(3);
+ // The dropped tail ObjectNodes were never visited — their payload
fields remain
+ // untouched (still raw strings, not wrapped truncatedString objects).
Under the old
+ // pre-reorder code path, these would have been mutated before being
discarded.
+ assertThat(tail0.get("payload").isTextual()).isTrue();
+ assertThat(tail0.get("payload").asText()).isEqualTo(longValue);
+ assertThat(tail1.get("payload").isTextual()).isTrue();
+ assertThat(tail1.get("payload").asText()).isEqualTo(longValue);
+ assertThat(tail2.get("payload").isTextual()).isTrue();
+ assertThat(tail2.get("payload").asText()).isEqualTo(longValue);
+ }
+
+ @Test
+ void testNullNode() {
+ JsonTruncator truncator = new JsonTruncator(10, 10, 10);
+
+ boolean result = truncator.truncate(null);
+
+ assertThat(result).isFalse();
+ }
+}