This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit 117bc8f0854a498d4f9ee999ba958510f8c04b84
Author: sxnan <[email protected]>
AuthorDate: Fri Jan 16 17:32:07 2026 +0800

    [log] Add config to set the event log path
    
    # Conflicts:
    #       
runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
---
 docs/content/docs/operations/configuration.md      |   1 +
 docs/content/docs/operations/monitoring.md         |   2 +-
 .../e2e_tests_integration/event_log_test.py        | 100 +++++++++++++++++++++
 .../runtime/operator/ActionExecutionOperator.java  |  13 ++-
 .../operator/ActionExecutionOperatorTest.java      |  47 +++++++++-
 5 files changed, 160 insertions(+), 3 deletions(-)

diff --git a/docs/content/docs/operations/configuration.md 
b/docs/content/docs/operations/configuration.md
index 2acc1b9a..2ed6f768 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -191,6 +191,7 @@ Here are the configuration options for Agent execution.
 
 | Key                                 | Default                    | Type      
            | Description                                                       
                                                                                
                                                                                
                              |
 
|-------------------------------------|----------------------------|-----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `baseLogDir`                        | (none)                     | String    
            | Base directory for file-based event logs. If not set, uses 
`java.io.tmpdir/flink-agents`.                                                  
                                                                                
                                      |
 | `error-handling-strategy`           | ErrorHandlingStrategy.FAIL | 
ErrorHandlingStrategy | Strategy for handling errors during model requests, 
include timeout and unexpected output schema. <br/>The option value could 
be:<br/> <ul><li>`ErrorHandlingStrategy.FAIL`</li> 
<li>`ErrorHandlingStrategy.RETRY`</li> <li>`ErrorHandlingStrategy.IGNORE`</li> |
 | `max-retries`             | 3                          | int                 
  | Number of retries when using `ErrorHandlingStrategy.RETRY`.                 
                                                                                
                                                                                
                    |
 
diff --git a/docs/content/docs/operations/monitoring.md 
b/docs/content/docs/operations/monitoring.md
index 462a7ba1..88e8fbfe 100644
--- a/docs/content/docs/operations/monitoring.md
+++ b/docs/content/docs/operations/monitoring.md
@@ -158,4 +158,4 @@ The log files follow a naming convention consistent with 
Flink's logging standar
 └── events-{jobId}-{taskName}-{subtaskId}.log
 ```
 
-By default, all File-based Event Logs are stored in the `flink-agents` 
subdirectory under the system temporary directory (`java.io.tmpdir`). In future 
versions, we plan to add a configurable parameter to allow users to customize 
the base log directory, providing greater control over log storage paths and 
lifecycle management.
+By default, all File-based Event Logs are stored in the `flink-agents` 
subdirectory under the system temporary directory (`java.io.tmpdir`). You can 
override the base log directory with the `agent.baseLogDir` setting in Flink 
`config.yaml`.
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py 
b/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py
new file mode 100644
index 00000000..275f5a73
--- /dev/null
+++ b/python/flink_agents/e2e_tests/e2e_tests_integration/event_log_test.py
@@ -0,0 +1,100 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import json
+import os
+import sysconfig
+from pathlib import Path
+
+from pyflink.common import Configuration, Encoder, WatermarkStrategy
+from pyflink.common.typeinfo import Types
+from pyflink.datastream import RuntimeExecutionMode, StreamExecutionEnvironment
+from pyflink.datastream.connectors.file_system import (
+    FileSource,
+    StreamFormat,
+    StreamingFileSink,
+)
+
+from flink_agents.api.execution_environment import AgentsExecutionEnvironment
+from flink_agents.e2e_tests.e2e_tests_integration.flink_integration_agent 
import (
+    DataStreamAgent,
+    ItemData,
+    MyKeySelector,
+)
+
+current_dir = Path(__file__).parent
+os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
+
+
+def test_event_log_base_dir_flink(tmp_path: Path) -> None:  # noqa: D103
+    config = Configuration()
+    config.set_string("state.backend.type", "rocksdb")
+    config.set_string("checkpointing.interval", "1s")
+    config.set_string("restart-strategy.type", "disable")
+    env = StreamExecutionEnvironment.get_execution_environment(config)
+    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
+    env.set_parallelism(1)
+
+    input_datastream = env.from_source(
+        source=FileSource.for_record_stream_format(
+            StreamFormat.text_line_format(), 
f"file:///{current_dir}/../resources/input"
+        ).build(),
+        watermark_strategy=WatermarkStrategy.no_watermarks(),
+        source_name="event_log_test_source",
+    )
+
+    deserialize_datastream = input_datastream.map(
+        lambda x: ItemData.model_validate_json(x)
+    )
+
+    agents_env = AgentsExecutionEnvironment.get_execution_environment(env=env)
+    event_log_dir = tmp_path / "event_log"
+    agents_env.get_config().set_str("baseLogDir", str(event_log_dir))
+
+    output_datastream = (
+        agents_env.from_datastream(
+            input=deserialize_datastream, key_selector=MyKeySelector()
+        )
+        .apply(DataStreamAgent())
+        .to_datastream()
+    )
+
+    result_dir = tmp_path / "results"
+    result_dir.mkdir(parents=True, exist_ok=True)
+    output_datastream.map(lambda x: x.model_dump_json(), 
Types.STRING()).add_sink(
+        StreamingFileSink.for_row_format(
+            base_path=str(result_dir.absolute()),
+            encoder=Encoder.simple_string_encoder(),
+        ).build()
+    )
+
+    agents_env.execute()
+
+    event_logs = list(event_log_dir.glob("events-*.log"))
+    assert event_logs, "No event log files found in configured baseLogDir."
+
+    first_log = event_logs[0]
+    record = None
+    with first_log.open("r", encoding="utf-8") as handle:
+        for line in handle:
+            if line.strip():
+                record = json.loads(line)
+                break
+
+    assert record is not None, "Event log file is empty."
+    assert "context" in record
+    assert "event" in record
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 6a6ae880..41b31e69 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.agents.runtime.context.JavaRunnerContextImpl;
 import org.apache.flink.agents.runtime.context.RunnerContextImpl;
 import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment;
 import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
+import org.apache.flink.agents.runtime.eventlog.FileEventLogger;
 import org.apache.flink.agents.runtime.memory.CachedMemoryStore;
 import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
 import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
@@ -98,6 +99,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.ACTION_STATE_STORE_BACKEND;
+import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.BASE_LOG_DIR;
 import static 
org.apache.flink.agents.api.configuration.AgentConfigOptions.JOB_IDENTIFIER;
 import static 
org.apache.flink.agents.runtime.actionstate.ActionStateStore.BackendType.KAFKA;
 import static org.apache.flink.agents.runtime.utils.StateUtil.*;
@@ -220,7 +222,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         this.inputIsJava = inputIsJava;
         this.processingTimeService = processingTimeService;
         this.mailboxExecutor = mailboxExecutor;
-        this.eventLogger = 
EventLoggerFactory.createLogger(EventLoggerConfig.builder().build());
+        this.eventLogger = createEventLogger(agentPlan);
         this.eventListeners = new ArrayList<>();
         this.actionStateStore = actionStateStore;
         this.checkpointIdToSeqNums = new HashMap<>();
@@ -1077,6 +1079,15 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         }
     }
 
+    private EventLogger createEventLogger(AgentPlan agentPlan) {
+        EventLoggerConfig.Builder loggerConfigBuilder = 
EventLoggerConfig.builder();
+        String baseLogDir = agentPlan.getConfig().get(BASE_LOG_DIR);
+        if (baseLogDir != null && !baseLogDir.trim().isEmpty()) {
+            
loggerConfigBuilder.property(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY, 
baseLogDir);
+        }
+        return EventLoggerFactory.createLogger(loggerConfigBuilder.build());
+    }
+
     /** Failed to execute Action task. */
     public static class ActionTaskExecutionException extends Exception {
         public ActionTaskExecutionException(String message, Throwable cause) {
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
index f098acd8..f2709b32 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorTest.java
@@ -20,14 +20,17 @@ package org.apache.flink.agents.runtime.operator;
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.InputEvent;
 import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
 import org.apache.flink.agents.api.context.DurableCallable;
 import org.apache.flink.agents.api.context.MemoryObject;
 import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.plan.AgentConfiguration;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.JavaFunction;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.actionstate.ActionState;
 import org.apache.flink.agents.runtime.actionstate.InMemoryActionStateStore;
+import org.apache.flink.agents.runtime.eventlog.FileEventLogger;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -226,6 +229,39 @@ public class ActionExecutionOperatorTest {
         }
     }
 
+    @Test
+    void testEventLogBaseDirFromAgentConfig() throws Exception {
+        String baseLogDir = "/tmp/flink-agents-test";
+        AgentConfiguration config = new AgentConfiguration();
+        config.set(AgentConfigOptions.BASE_LOG_DIR, baseLogDir);
+        AgentPlan agentPlan = TestAgent.getAgentPlanWithConfig(config);
+
+        try (KeyedOneInputStreamOperatorTestHarness<Long, Long, Object> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        new ActionExecutionOperatorFactory(agentPlan, true),
+                        (KeySelector<Long, Long>) value -> value,
+                        TypeInformation.of(Long.class))) {
+            testHarness.open();
+            ActionExecutionOperator<Long, Object> operator =
+                    (ActionExecutionOperator<Long, Object>) 
testHarness.getOperator();
+            Field eventLoggerField = 
ActionExecutionOperator.class.getDeclaredField("eventLogger");
+            eventLoggerField.setAccessible(true);
+            Object eventLogger = eventLoggerField.get(operator);
+            assertThat(eventLogger).isInstanceOf(FileEventLogger.class);
+
+            Field configField = 
FileEventLogger.class.getDeclaredField("config");
+            configField.setAccessible(true);
+            Object loggerConfig = configField.get(eventLogger);
+            Field propertiesField = 
loggerConfig.getClass().getDeclaredField("properties");
+            propertiesField.setAccessible(true);
+            @SuppressWarnings("unchecked")
+            Map<String, Object> properties =
+                    (Map<String, Object>) propertiesField.get(loggerConfig);
+            
assertThat(properties.get(FileEventLogger.BASE_LOG_DIR_PROPERTY_KEY))
+                    .isEqualTo(baseLogDir);
+        }
+    }
+
     @Test
     void testActionStateStoreContentVerification() throws Exception {
         AgentPlan agentPlanWithStateStore = TestAgent.getAgentPlan(false);
@@ -955,6 +991,15 @@ public class ActionExecutionOperatorTest {
         }
 
         public static AgentPlan getAgentPlan(boolean 
testMemoryAccessOutOfMailbox) {
+            return getAgentPlanWithConfig(new AgentConfiguration(), 
testMemoryAccessOutOfMailbox);
+        }
+
+        public static AgentPlan getAgentPlanWithConfig(AgentConfiguration 
config) {
+            return getAgentPlanWithConfig(config, false);
+        }
+
+        private static AgentPlan getAgentPlanWithConfig(
+                AgentConfiguration config, boolean 
testMemoryAccessOutOfMailbox) {
             try {
                 Map<String, List<Action>> actionsByEvent = new HashMap<>();
                 Action action1 =
@@ -995,7 +1040,7 @@ public class ActionExecutionOperatorTest {
                     actions.put(action3.getName(), action3);
                 }
 
-                return new AgentPlan(actions, actionsByEvent, new HashMap<>());
+                return new AgentPlan(actions, actionsByEvent, new HashMap<>(), 
config);
             } catch (Exception e) {
                 ExceptionUtils.rethrow(e);
             }

Reply via email to