wenjin272 commented on code in PR #638:
URL: https://github.com/apache/flink-agents/pull/638#discussion_r3225095665


##########
runtime/src/main/java/org/apache/flink/agents/runtime/eventlog/Slf4jEventLogger.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime.eventlog;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.EventContext;
+import org.apache.flink.agents.api.configuration.AgentConfigOptions;
+import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.apache.flink.agents.api.logger.EventLogger;
+import org.apache.flink.agents.api.logger.EventLoggerConfig;
+import org.apache.flink.agents.api.logger.EventLoggerOpenParams;
+import org.apache.flink.metrics.Counter;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.FileAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * An SLF4J-based event logger that outputs events through a dedicated SLF4J 
logger.
+ *
+ * <p>This logger writes event log records as JSON to a dedicated SLF4J logger 
named {@value
+ * #EVENT_LOGGER_NAME}. Events are automatically routed to a separate file in 
Flink's log directory,
+ * making them visible in Flink's Web UI "Logs" tab.
+ *
+ * <p>On {@link #open}, the logger automatically configures log4j2 to write 
event logs to a separate
+ * file (derived from Flink's {@code log.file} system property). No manual 
log4j2 configuration is
+ * required.
+ *
+ * <p>Unlike {@link FileEventLogger}, which creates a separate log file per 
subtask, this logger
+ * writes all events from a TaskManager to the same log destination. To 
distinguish events from
+ * different subtasks, each JSON record includes {@code jobId}, {@code 
taskName}, and {@code
+ * subtaskId} fields.
+ *
+ * <p>This logger honors the per-event-type log level configuration resolved 
by {@link
+ * EventLogLevelResolver}; events resolved to {@link EventLogLevel#OFF} are 
skipped, and events at
+ * {@link EventLogLevel#STANDARD} have their payloads truncated by a {@link 
JsonTruncator}.
+ *
+ * <h3>Thread Safety</h3>
+ *
+ * <p>This class is <strong>thread-safe at the Flink subtask level</strong>, 
following the same
+ * guarantees as {@link FileEventLogger}. Each subtask instance gets its own 
logger instance with
+ * its own subtask context fields.
+ */
+public class Slf4jEventLogger implements EventLogger {
+    /** Dedicated logger name for event log output. */
+    public static final String EVENT_LOGGER_NAME = 
"org.apache.flink.agents.EventLog";
+
+    private static final String EVENT_LOG_APPENDER_NAME = 
"FlinkAgentsEventLogAppender";
+
+    private static final Logger EVENT_LOG = 
LoggerFactory.getLogger(EVENT_LOGGER_NAME);
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    private final EventLoggerConfig config;
+    private boolean prettyPrint;
+    private String jobId;
+    private String taskName;
+    private int subtaskId;
+    private EventLogLevelResolver levelResolver;
+    private JsonTruncator truncator;
+    private Counter truncatedEventsCounter;
+
+    public Slf4jEventLogger(EventLoggerConfig config) {
+        this.config = config;
+    }
+
+    @Override
+    public void open(EventLoggerOpenParams params) throws Exception {
+        jobId = params.getRuntimeContext().getJobInfo().getJobId().toString();
+        taskName = params.getRuntimeContext().getTaskInfo().getTaskName();
+        subtaskId = 
params.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+
+        // The full agent config is the single source of truth for all logger 
settings
+        // (mirrors FileEventLogger).
+        @SuppressWarnings("unchecked")
+        Map<String, Object> agentConfig =
+                (Map<String, Object>)
+                        config.getProperties()
+                                .getOrDefault(
+                                        
EventLoggerConfig.AGENT_CONFIG_PROPERTY_KEY,
+                                        Collections.emptyMap());
+        prettyPrint =
+                (Boolean) 
agentConfig.getOrDefault(AgentConfigOptions.PRETTY_PRINT.getKey(), true);

Review Comment:
   The default value of prettyPrint for `Slf4jEventLogger` is true, but for 
`FileEventLogger` is false. And the default value for 
`AgentConfigOptions.PRETTY_PRINT` is false. 
   
   Can we use the option default value for both Slf4j and File event logger? 
like
   ```
   prettyPrint =
                   (Boolean) 
agentConfig.getOrDefault(AgentConfigOptions.PRETTY_PRINT.getKey(), 
                                                      
AgentConfigOptions.PRETTY_PRINT.getDefaultValue());
   ```



##########
api/src/main/java/org/apache/flink/agents/api/configuration/AgentConfigOptions.java:
##########
@@ -18,10 +18,19 @@
 package org.apache.flink.agents.api.configuration;
 
 import org.apache.flink.agents.api.logger.EventLogLevel;
+import org.apache.flink.agents.api.logger.LoggerType;
 
 /** The set of configuration options for agents parameters. */
 public class AgentConfigOptions {
 
+    /**
+     * The config parameter specifies which event logger implementation to 
use. The value should
+     * match one of the identifiers returned by {@link LoggerType#getType()} 
(e.g., {@code "slf4j"}
+     * or {@code "file"}). Defaults to {@link LoggerType#SLF4J}.
+     */
+    public static final ConfigOption<String> EVENT_LOGGER_TYPE =
+            new ConfigOption<>("eventLoggerType", String.class, 
LoggerType.SLF4J.getType());

Review Comment:
   Shall we just use enum for the option type here?
   ```
   public static final ConfigOption<LoggerType> EVENT_LOGGER_TYPE =
               new ConfigOption<>("eventLoggerType", LoggerType.class, 
LoggerType.SLF4J);
   ```
   
   Additionally, should we add corresponding configuration options on the 
Python side? My understanding is that Python can currently be configured in the 
following ways.
   
   `agents_env.get_config().set_str("eventLoggerType", "SLF4j")`
   
   But I think the following way is better:
   `agents_env.get_config().set(AgentConfigOptions.EVENT_LOGGER_TYPE, 
LoggerType.SLF4J)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to