This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0261f86005 [Feature][Zeta][Core] Support output log file of job (#7712)
0261f86005 is described below

commit 0261f86005690d34f63e8765d4d2e069ede603a1
Author: hailin0 <[email protected]>
AuthorDate: Tue Sep 24 11:48:22 2024 +0800

    [Feature][Zeta][Core] Support output log file of job (#7712)
---
 config/log4j2.properties                           |  16 ++
 docs/en/seatunnel-engine/logging.md                | 118 +++++++++
 docs/zh/seatunnel-engine/logging.md                | 118 +++++++++
 .../engine/e2e/ClusterSeaTunnelContainer.java      |  19 --
 .../seatunnel/engine/e2e/joblog/JobLogIT.java      | 290 +++++++++++++++++++++
 .../test/resources/job-log-file}/log4j2.properties |  30 ++-
 6 files changed, 562 insertions(+), 29 deletions(-)

diff --git a/config/log4j2.properties b/config/log4j2.properties
index 57dfa5fdf0..256a2a8c20 100644
--- a/config/log4j2.properties
+++ b/config/log4j2.properties
@@ -61,6 +61,22 @@ appender.consoleStderr.filter.acceptGteWarn.level = WARN
 appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
 appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY
 
+appender.routing.name = routingAppender
+appender.routing.type = Routing
+appender.routing.purge.type = IdlePurgePolicy
+appender.routing.purge.timeToLive = 60
+appender.routing.route.type = Routes
+appender.routing.route.pattern = $${ctx:ST-JID}
+appender.routing.route.system.type = Route
+appender.routing.route.system.key = $${ctx:ST-JID}
+appender.routing.route.system.ref = fileAppender
+appender.routing.route.job.type = Route
+appender.routing.route.job.appender.type = File
+appender.routing.route.job.appender.name = job-${ctx:ST-JID}
+appender.routing.route.job.appender.fileName = 
${file_path}/job-${ctx:ST-JID}.log
+appender.routing.route.job.appender.layout.type = PatternLayout
+appender.routing.route.job.appender.layout.pattern = %d{yyyy-MM-dd 
HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
+
 appender.file.name = fileAppender
 appender.file.type = RollingFile
 appender.file.fileName = ${file_path}/${file_name}.log
diff --git a/docs/en/seatunnel-engine/logging.md 
b/docs/en/seatunnel-engine/logging.md
new file mode 100644
index 0000000000..7c827887b8
--- /dev/null
+++ b/docs/en/seatunnel-engine/logging.md
@@ -0,0 +1,118 @@
+---
+sidebar_position: 14
+---
+
+# Logging
+
+All SeaTunnel Engine processes create a log text file that contains messages 
for various events happening in that process. These logs provide deep insights 
into the inner workings of SeaTunnel Engine, and can be used to detect problems 
(in the form of WARN/ERROR messages) and can help in debugging them.
+
+The logging in SeaTunnel Engine uses the SLF4J logging interface. This allows 
you to use any logging framework that supports SLF4J, without having to modify 
the SeaTunnel Engine source code.
+
+By default, Log4j 2 is used as the underlying logging framework.
+
+## Structured logging
+
+SeaTunnel Engine adds the following fields to MDC of most of the relevant log 
messages (experimental feature):
+
+- Job ID
+  - key: ST-JID
+  - format: string
+
+This is most useful in environments with structured logging and allows you to 
quickly filter the relevant logs.
+
+The MDC is propagated by slf4j to the logging backend which usually adds it to 
the log records automatically (e.g. in log4j json layout). Alternatively, it 
can be configured explicitly - log4j pattern layout might look like this:
+
+```properties
+[%X{ST-JID}] %c{0} %m%n.
+```
+
+## Configuring Log4j2
+
+Log4j 2 is controlled using property files.
+
+The SeaTunnel Engine distribution ships with the following log4j properties 
files in the `confing` directory, which are used automatically if Log4j 2 is 
enabled:
+
+- `log4j2_client.properties`: used by the command line client (e.g., 
`seatunnel.sh`)
+- `log4j2.properties`: used for SeaTunnel Engine server processes (e.g., 
`seatunnel-cluster.sh`)
+
+By default, log files are output to the `logs` directory.
+
+Log4j periodically scans this file for changes and adjusts the logging 
behavior if necessary. By default this check happens every 60 seconds and is 
controlled by the monitorInterval setting in the Log4j properties files.
+
+### Configure to output separate log files for jobs
+
+To output separate log files for each job, you can update the following 
configuration in the `log4j2.properties` file:
+
+```properties
+...
+rootLogger.appenderRef.file.ref = routingAppender
+...
+
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] 
[%t] - %m%n
+...
+```
+
+This configuration generates separate log files for each job, for example:
+
+```
+job-xxx1.log
+job-xxx2.log
+job-xxx3.log
+...
+```
+
+### Configuring output mixed logs
+
+*This configuration mode by default.*
+
+To all job logs output into SeaTunnel Engine system log file, you can update 
the following configuration in the `log4j2.properties` file:
+
+```properties
+...
+rootLogger.appenderRef.file.ref = fileAppender
+...
+
+appender.file.layout.pattern = [%X{ST-JID}] %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p 
[%-30.30c{1.}] [%t] - %m%n
+...
+```
+
+### Compatibility with Log4j1/Logback
+
+SeaTunnel Engine automatically integrates Log framework bridge, allowing 
existing applications that work against Log4j1/Logback classes to continue 
working.
+
+## Best practices for developers
+
+You can create an SLF4J logger by calling 
`org.slf4j.LoggerFactory#LoggerFactory.getLogger` with the Class of your class 
as an argument.
+
+Of course, you can also use `lombok` annotation `@Slf4j` to achieve the same 
effect.
+
+```java
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestConnector {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TestConnector.class);
+
+       public static void main(String[] args) {
+               LOG.info("Hello world!");
+       }
+}
+```
+
+In order to benefit most from SLF4J, it is recommended to use its placeholder 
mechanism. Using placeholders allows avoiding unnecessary string constructions 
in case that the logging level is set so high that the message would not be 
logged.
+
+The syntax of placeholders is the following:
+
+```java
+LOG.info("This message contains {} placeholders. {}", 1, "key1");
+```
+
+Placeholders can also be used in conjunction with exceptions which shall be 
logged.
+
+```java
+try {
+    // some code
+} catch (Exception e) {
+    LOG.error("An {} occurred", "error", e);
+}
+```
\ No newline at end of file
diff --git a/docs/zh/seatunnel-engine/logging.md 
b/docs/zh/seatunnel-engine/logging.md
new file mode 100644
index 0000000000..8f04eaa911
--- /dev/null
+++ b/docs/zh/seatunnel-engine/logging.md
@@ -0,0 +1,118 @@
+---
+sidebar_position: 14
+---
+
+# 日志
+
+每个 SeaTunnel Engine 进程都会创建一个日志文件,其中包含该进程中发生的各种事件的消息。这些日志提供了对 SeaTunnel Engine 
内部工作原理的深入了解,可用于检测问题(以 WARN/ERROR 消息的形式)并有助于调试问题。
+
+SeaTunnel Engine 中的日志记录使用 SLF4J 日志记录接口。这允许您使用任何支持 SLF4J 的日志记录框架,而无需修改 
SeaTunnel Engine 源代码。
+
+默认情况下,Log4j2 用作底层日志记录框架。
+
+## 结构化信息
+
+SeaTunnel Engine 向大多数相关日志消息的 MDC 添加了以下字段(实验性功能):
+
+- Job ID
+  - key: ST-JID
+  - format: string
+
+这在具有结构化日志记录的环境中最为有用,允许您快速过滤相关日志。
+
+MDC 由 slf4j 传播到日志后端,后者通常会自动将其添加到日志记录中(例如,在 log4j json 布局中)。或者,也可以明确配置 - log4j 
模式布局可能如下所示:
+
+```properties
+[%X{ST-JID}] %c{0} %m%n.
+```
+
+## 配置 Log4j2
+
+Log4j2 使用属性文件进行控制。
+
+SeaTunnel Engine 发行版在 `confing` 目录中附带以下 log4j 属性文件,如果启用了 Log4j2,则会自动使用这些文件:
+
+- `log4j2_client.properties`: 由命令行客户端使用 (e.g., `seatunnel.sh`)
+- `log4j2.properties`: 由 SeaTunnel 引擎服务使用 (e.g., `seatunnel-cluster.sh`)
+
+默认情况下,日志文件输出到 `logs` 目录。
+
+Log4j 会定期扫描上述文件以查找更改,并根据需要调整日志记录行为。默认情况下,此检查每 60 秒进行一次,由 Log4j 属性文件中的 
monitorInterval 设置控制。
+
+### 配置作业生成单独的日志文件
+
+要为每个作业输出单独的日志文件,您可以更新 `log4j2.properties` 文件中的以下配置:
+
+```properties
+...
+rootLogger.appenderRef.file.ref = routingAppender
+...
+
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] 
[%t] - %m%n
+...
+```
+
+此配置为每个作业生成单独的日志文件,例如:
+
+```
+job-xxx1.log
+job-xxx2.log
+job-xxx3.log
+...
+```
+
+### 配置混合日志文件
+
+*默认已采用此配置模式。*
+
+要将所有作业日志输出到 SeaTunnel Engine 系统日志文件中,您可以在 `log4j2.properties` 文件中更新以下配置:
+
+```properties
+...
+rootLogger.appenderRef.file.ref = fileAppender
+...
+
+appender.file.layout.pattern = [%X{ST-JID}] %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p 
[%-30.30c{1.}] [%t] - %m%n
+...
+```
+
+### 兼容 Log4j1/Logback
+
+SeaTunnel Engine 自动集成了大多数 Log 桥接器,允许针对 Log4j1/Logback 类工作的现有应用程序继续工作。
+
+## 开发人员最佳实践
+
+您可以通过调用 `org.slf4j.LoggerFactory#LoggerFactory.getLogger` 并以您的类的类作为参数来创建 SLF4J 
记录器。
+
+当然您也可以使用 lombok 注解 `@Slf4j` 来实现同样的效果
+
+```java
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestConnector {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TestConnector.class);
+
+       public static void main(String[] args) {
+               LOG.info("Hello world!");
+       }
+}
+```
+
+为了最大限度地利用 SLF4J,建议使用其占位符机制。使用占位符可以避免不必要的字符串构造,以防日志级别设置得太高而导致消息无法记录。
+
+占位符的语法如下:
+
+```java
+LOG.info("This message contains {} placeholders. {}", 1, "key1");
+```
+
+占位符还可以与需要记录的异常结合使用
+
+```java
+try {
+    // some code
+} catch (Exception e) {
+    LOG.error("An {} occurred", "error", e);
+}
+```
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
index 894e901596..422735a3ec 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterSeaTunnelContainer.java
@@ -47,7 +47,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Stream;
 
 import static io.restassured.RestAssured.given;
 import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
@@ -120,24 +119,6 @@ public class ClusterSeaTunnelContainer extends 
SeaTunnelContainer {
                                         i,
                                         paramJobName + "&jobId=" + 
CUSTOM_JOB_ID,
                                         true));
-
-        String serverLogs = server.getLogs();
-        String secondServerLogs = secondServer.getLogs();
-        Stream.of(
-                        // [862969647010611201] 2024-08-24 16:01:21,155 INFO
-                        // 
org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSourceSplitEnumerator 
- Starting to calculate splits.
-                        "\\[862969647010611201\\].* INFO 
org\\.apache\\.seatunnel\\.connectors\\.seatunnel\\.fake\\.source\\.FakeSourceSplitEnumerator",
-                        // [862969647010611201] 2024-08-24 16:01:21,278 INFO
-                        // 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter
-                        // - subtaskIndex=0  rowIndex=63:  
SeaTunnelRow#tableId=fake
-                        // SeaTunnelRow#kind=INSERT : qOWCd, 1033892054, 
671516661
-                        "\\[862969647010611201\\].* INFO 
org\\.apache\\.seatunnel\\.connectors\\.seatunnel\\.console\\.sink\\.ConsoleSinkWriter")
-                .map(
-                        regex -> {
-                            Assertions.assertTrue(
-                                    serverLogs.matches(regex) || 
secondServerLogs.matches(regex));
-                            return regex;
-                        });
     }
 
     @Test
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
new file mode 100644
index 0000000000..9c4477acb9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e.joblog;
+
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import io.restassured.response.Response;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
+import static io.restassured.RestAssured.given;
+import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
+import static org.hamcrest.Matchers.equalTo;
+
+public class JobLogIT extends SeaTunnelContainer {
+
+    private static final String CUSTOM_JOB_NAME = "test-job-log-file";
+    private static final long CUSTOM_JOB_ID = 862969647010611201L;
+
+    private static final String confFile = "/fakesource_to_console.conf";
+    private static final Path BIN_PATH = Paths.get(SEATUNNEL_HOME, "bin", 
SERVER_SHELL);
+    private static final Path CONFIG_PATH = Paths.get(SEATUNNEL_HOME, 
"config");
+    private static final Path HADOOP_JAR_PATH =
+            Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar");
+
+    private GenericContainer<?> secondServer;
+    private final Network NETWORK = Network.newNetwork();
+
+    @Override
+    @BeforeEach
+    public void startUp() throws Exception {
+        server = createServer("server");
+        secondServer = createServer("secondServer");
+
+        // check cluster
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            Response response =
+                                    given().get(
+                                                    "http://";
+                                                            + server.getHost()
+                                                            + ":"
+                                                            + 
server.getFirstMappedPort()
+                                                            + 
"/hazelcast/rest/cluster");
+                            response.then().statusCode(200);
+                            Assertions.assertEquals(
+                                    2, 
response.jsonPath().getList("members").size());
+                        });
+    }
+
+    @Override
+    @AfterEach
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (secondServer != null) {
+            secondServer.close();
+        }
+    }
+
+    @Test
+    public void testJobLogFile() throws Exception {
+        submitJobAndAssertResponse(
+                server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME, 
CUSTOM_JOB_ID);
+
+        assertConsoleLog();
+        assertFileLog();
+    }
+
+    private void assertConsoleLog() {
+        Awaitility.await()
+                .atMost(2, TimeUnit.MINUTES)
+                .untilAsserted(
+                        () -> {
+                            String serverLogs = server.getLogs();
+                            String secondServerLogs = secondServer.getLogs();
+                            Stream.of(
+                                            // [862969647010611201] 2024-09-21 
17:11:41,919 INFO
+                                            // [.f.s.FakeSourceSplitEnumerator]
+                                            // 
[BlockingWorker-TaskGroupLocation{jobId=862969647010611201, pipelineId=1, 
taskGroupId=1}] - Starting to calculate splits.
+                                            
"\\[862969647010611201\\].*INFO\\s+\\[.f.s.FakeSourceSplitEnumerator\\].*Starting
 to calculate splits",
+                                            // [862969647010611201] 2024-09-21 
17:11:41,757 INFO
+                                            // [.a.s.c.s.c.s.ConsoleSinkWriter]
+                                            // 
[hz.main.seaTunnel.task.thread-4] - output rowType:
+                                            // name<STRING>, age<INT>, 
card<INT>
+                                            
"\\[862969647010611201\\].*INFO\\s+\\[.a.s.c.s.c.s.ConsoleSinkWriter\\].*output 
rowType: name<STRING>, age<INT>, card<INT>")
+                                    .forEach(
+                                            regex -> {
+                                                Pattern pattern = 
Pattern.compile(regex);
+                                                Assertions.assertTrue(
+                                                        
pattern.matcher(serverLogs).find()
+                                                                || 
pattern.matcher(secondServerLogs)
+                                                                        
.find());
+                                            });
+                        });
+    }
+
+    private void assertFileLog() throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                server.execInContainer(
+                        "sh", "-c", "cat 
/tmp/seatunnel/logs/job-862969647010611201.log");
+        String serverLogs = execResult.getStdout();
+
+        execResult =
+                secondServer.execInContainer(
+                        "sh", "-c", "cat 
/tmp/seatunnel/logs/job-862969647010611201.log");
+        String secondServerLogs = execResult.getStdout();
+
+        Stream.of(
+                        // 2024-09-21 16:37:44,503 INFO  
[.f.s.FakeSourceSplitEnumerator]
+                        // 
[BlockingWorker-TaskGroupLocation{jobId=862969647010611201, pipelineId=1,
+                        // taskGroupId=1}] - Starting to calculate splits.
+                        
"INFO\\s+\\[.f.s.FakeSourceSplitEnumerator\\].*Starting to calculate splits",
+                        // 2024-09-21 16:37:44,295 INFO  
[.a.s.c.s.c.s.ConsoleSinkWriter]
+                        // [hz.main.seaTunnel.task.thread-4] - output rowType: 
name<STRING>,
+                        // age<INT>, card<INT>
+                        "INFO\\s+\\[.a.s.c.s.c.s.ConsoleSinkWriter\\].*output 
rowType: name<STRING>, age<INT>, card<INT>")
+                .forEach(
+                        regex -> {
+                            Pattern pattern = Pattern.compile(regex);
+                            Assertions.assertTrue(
+                                    pattern.matcher(serverLogs).find()
+                                            || 
pattern.matcher(secondServerLogs).find());
+                        });
+    }
+
+    private Response submitJob(
+            GenericContainer<?> container,
+            String jobMode,
+            boolean isStartWithSavePoint,
+            String jobName,
+            long jobId) {
+        String requestBody =
+                "{\n"
+                        + "    \"env\": {\n"
+                        + "        \"job.name\": \""
+                        + jobName
+                        + "\",\n"
+                        + "        \"job.mode\": \""
+                        + jobMode
+                        + "\"\n"
+                        + "    },\n"
+                        + "    \"source\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"FakeSource\",\n"
+                        + "            \"result_table_name\": \"fake\",\n"
+                        + "            \"row.num\": 100,\n"
+                        + "            \"schema\": {\n"
+                        + "                \"fields\": {\n"
+                        + "                    \"name\": \"string\",\n"
+                        + "                    \"age\": \"int\",\n"
+                        + "                    \"card\": \"int\"\n"
+                        + "                }\n"
+                        + "            }\n"
+                        + "        }\n"
+                        + "    ],\n"
+                        + "    \"transform\": [\n"
+                        + "    ],\n"
+                        + "    \"sink\": [\n"
+                        + "        {\n"
+                        + "            \"plugin_name\": \"Console\",\n"
+                        + "            \"source_table_name\": [\"fake\"]\n"
+                        + "        }\n"
+                        + "    ]\n"
+                        + "}";
+        String parameters = "jobId=" + jobId;
+        if (isStartWithSavePoint) {
+            parameters = parameters + "&isStartWithSavePoint=true";
+        }
+        Response response =
+                given().body(requestBody)
+                        .header("Content-Type", "application/json; 
charset=utf-8")
+                        .post(
+                                parameters == null
+                                        ? "http://";
+                                                + container.getHost()
+                                                + ":"
+                                                + 
container.getFirstMappedPort()
+                                                + RestConstant.SUBMIT_JOB_URL
+                                        : "http://";
+                                                + container.getHost()
+                                                + ":"
+                                                + 
container.getFirstMappedPort()
+                                                + RestConstant.SUBMIT_JOB_URL
+                                                + "?"
+                                                + parameters);
+        return response;
+    }
+
+    private GenericContainer<?> createServer(String networkAlias)
+            throws IOException, InterruptedException {
+        GenericContainer<?> server =
+                new GenericContainer<>(getDockerImage())
+                        .withNetwork(NETWORK)
+                        .withEnv("TZ", "UTC")
+                        
.withCommand(ContainerUtil.adaptPathForWin(BIN_PATH.toString()))
+                        .withNetworkAliases(networkAlias)
+                        .withExposedPorts()
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(
+                                                "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
+                        .waitingFor(Wait.forListeningPort());
+        copySeaTunnelStarterToContainer(server);
+        server.setExposedPorts(Collections.singletonList(5801));
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
+                CONFIG_PATH.toString());
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/"),
+                CONFIG_PATH.toString());
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
+                HADOOP_JAR_PATH.toString());
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/job-log-file/"),
+                CONFIG_PATH.toString());
+        server.start();
+        // execute extra commands
+        executeExtraCommands(server);
+        ContainerUtil.copyConnectorJarToContainer(
+                server,
+                confFile,
+                getConnectorModulePath(),
+                getConnectorNamePrefix(),
+                getConnectorType(),
+                SEATUNNEL_HOME);
+
+        return server;
+    }
+
+    private void submitJobAndAssertResponse(
+            GenericContainer<?> container,
+            String jobMode,
+            boolean isStartWithSavePoint,
+            String jobName,
+            long jobId) {
+        Response response = submitJob(container, jobMode, 
isStartWithSavePoint, jobName, jobId);
+        response.then()
+                .statusCode(200)
+                .body("jobName", equalTo(jobName))
+                .body("jobId", equalTo(String.valueOf(jobId)));
+    }
+}
diff --git a/config/log4j2.properties 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/job-log-file/log4j2.properties
similarity index 78%
copy from config/log4j2.properties
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/job-log-file/log4j2.properties
index 57dfa5fdf0..7270ef0ae5 100644
--- a/config/log4j2.properties
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/job-log-file/log4j2.properties
@@ -27,18 +27,12 @@ property.file_ttl = 7d
 
 rootLogger.level = INFO
 
-logger.zeta.name=org.apache.seatunnel.engine
-logger.zeta.level=INFO
-
-logger.debezium.name=io.debezium.connector
-logger.debezium.level=WARN
-
 ############################ log output to console 
#############################
-#rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
-#rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
+rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
+rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
 ############################ log output to console 
#############################
 ############################ log output to file    
#############################
-rootLogger.appenderRef.file.ref = fileAppender
+rootLogger.appenderRef.file.ref = routingAppender
 ############################ log output to file    
#############################
 
 appender.consoleStdout.name = consoleStdoutAppender
@@ -61,13 +55,29 @@ appender.consoleStderr.filter.acceptGteWarn.level = WARN
 appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
 appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY
 
+appender.routing.name = routingAppender
+appender.routing.type = Routing
+appender.routing.purge.type = IdlePurgePolicy
+appender.routing.purge.timeToLive = 60
+appender.routing.route.type = Routes
+appender.routing.route.pattern = $${ctx:ST-JID}
+appender.routing.route.system.type = Route
+appender.routing.route.system.key = $${ctx:ST-JID}
+appender.routing.route.system.ref = fileAppender
+appender.routing.route.job.type = Route
+appender.routing.route.job.appender.type = File
+appender.routing.route.job.appender.name = job-${ctx:ST-JID}
+appender.routing.route.job.appender.fileName = 
${file_path}/job-${ctx:ST-JID}.log
+appender.routing.route.job.appender.layout.type = PatternLayout
+appender.routing.route.job.appender.layout.pattern = %d{yyyy-MM-dd 
HH:mm:ss,SSS} %-5p [%-30.30c{1.}] [%t] - %m%n
+
 appender.file.name = fileAppender
 appender.file.type = RollingFile
 appender.file.fileName = ${file_path}/${file_name}.log
 appender.file.filePattern = ${file_path}/${file_name}.log.%d{yyyy-MM-dd}-%i
 appender.file.append = true
 appender.file.layout.type = PatternLayout
-appender.file.layout.pattern = [%X{ST-JID}] %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p 
[%-30.30c{1.}] [%t] - %m%n
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%-30.30c{1.}] 
[%t] - %m%n
 appender.file.policies.type = Policies
 appender.file.policies.time.type = TimeBasedTriggeringPolicy
 appender.file.policies.time.modulate = true

Reply via email to