This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b1a4be9149 [Fix] logs http basic issue # 9755 (#9968)
b1a4be9149 is described below

commit b1a4be9149c6d35252fa4e986e3c2d3cf69e26a2
Author: dotfive-star <[email protected]>
AuthorDate: Sat Nov 8 21:32:28 2025 +0800

    [Fix] logs http basic issue # 9755 (#9968)
---
 .../engine/server/rest/service/BaseLogService.java |  82 ++++++--
 .../engine/server/rest/service/LogService.java     |  20 +-
 .../engine/server/rest/RestApiHttpBasicTest.java   | 217 +++++++++++++++++++++
 3 files changed, 302 insertions(+), 17 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
index a1e84f9ec7..ce95965250 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
@@ -29,13 +29,19 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 
 @Slf4j
 public class BaseLogService extends BaseService {
+
     public BaseLogService(NodeEngineImpl nodeEngine) {
         super(nodeEngine);
     }
 
+    private static final String AUTHORIZATION_HEADER = "Authorization";
+    private static final String BASIC_PREFIX = "Basic ";
+
     /** Get configuration log path */
     public String getLogPath() {
         try {
@@ -46,31 +52,83 @@ public class BaseLogService extends BaseService {
         }
     }
 
+    /**
+     * Send a simple HTTP GET request.
+     *
+     * @param urlString url
+     * @return the response body as a string, or {@code null} if the request 
failed
+     */
     protected String sendGet(String urlString) {
+        return sendGet(urlString, null, null);
+    }
+
+    /**
+     * Send GET request (optionally with Basic Auth)
+     *
+     * @param urlString url
+     * @param user username, nullable
+     * @param pass password, nullable
+     * @return the response body as a string, or {@code null} if the request 
failed
+     */
+    protected String sendGet(String urlString, String user, String pass) {
+        HttpURLConnection connection = null;
         try {
-            HttpURLConnection connection = (HttpURLConnection) new 
URL(urlString).openConnection();
+            connection = (HttpURLConnection) new 
URL(urlString).openConnection();
             connection.setRequestMethod("GET");
             connection.setConnectTimeout(5000);
             connection.setReadTimeout(5000);
+
+            // Basic Auth
+            if (user != null && pass != null) {
+                String auth = user + ":" + pass;
+                String token =
+                        
Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
+                connection.setRequestProperty(AUTHORIZATION_HEADER, 
BASIC_PREFIX + token);
+            }
+
             connection.connect();
 
-            if (connection.getResponseCode() == 200) {
-                try (InputStream is = connection.getInputStream();
-                        ByteArrayOutputStream baos = new 
ByteArrayOutputStream()) {
-                    byte[] buffer = new byte[1024];
-                    int len;
-                    while ((len = is.read(buffer)) != -1) {
-                        baos.write(buffer, 0, len);
-                    }
-                    return baos.toString();
-                }
+            int code = connection.getResponseCode();
+            if (code == HttpURLConnection.HTTP_OK) {
+                return readResponseBody(connection.getInputStream());
+            } else {
+                log.warn("GET {} -> HTTP {}", urlString, code);
+                drainErrorStream(connection);
             }
         } catch (IOException e) {
-            log.error("Send get Fail.{}", ExceptionUtils.getMessage(e));
+            log.error("Send GET failed: url={}, err={}", urlString, 
ExceptionUtils.getMessage(e));
+        } finally {
+            if (connection != null) {
+                connection.disconnect();
+            }
         }
         return null;
     }
 
+    private String readResponseBody(InputStream is) throws IOException {
+        try (InputStream input = is;
+                ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+
+            byte[] buf = new byte[4096];
+            int len;
+            while ((len = input.read(buf)) != -1) {
+                output.write(buf, 0, len);
+            }
+            return output.toString(StandardCharsets.UTF_8.name());
+        }
+    }
+
+    private void drainErrorStream(HttpURLConnection connection) throws 
IOException {
+        try (InputStream err = connection.getErrorStream()) {
+            if (err != null) {
+                byte[] buffer = new byte[1024];
+                while (err.read(buffer) != -1) {
+                    // discard
+                }
+            }
+        }
+    }
+
     public String getLogParam(String uri, String contextPath) {
         uri = uri.substring(uri.indexOf(contextPath) + contextPath.length());
         uri = StringUtil.stripTrailingSlash(uri).substring(1);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java
index d112ae069b..013b08fdec 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/LogService.java
@@ -70,15 +70,25 @@ public class LogService extends BaseLogService {
                 systemMonitoringInformation -> {
                     String host = 
systemMonitoringInformation.asObject().get("host").asString();
                     String url = "http://"; + host + ":" + port + contextPath;
-                    String allName = sendGet(url + REST_URL_GET_ALL_LOG_NAME);
+                    String logUrl = url + REST_URL_GET_ALL_LOG_NAME;
+
+                    String allName =
+                            httpConfig.isEnableBasicAuth()
+                                    ? sendGet(
+                                            logUrl,
+                                            httpConfig.getBasicAuthUsername(),
+                                            httpConfig.getBasicAuthPassword())
+                                    : sendGet(logUrl);
+
                     if (StringUtils.isBlank(allName)) {
                         log.warn(
-                                "Get log file name failed: response logName is 
blank. url: {}, response: {}",
-                                url + REST_URL_GET_ALL_LOG_NAME,
-                                allName);
+                                "GET {} returned empty body (null/empty). Skip 
this node.", logUrl);
                         return;
                     }
-                    log.debug("Request: {} , Result: {}", url, allName);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Request: {} , Result: {}", url, allName);
+                    }
                     ArrayNode jsonNodes = JsonUtils.parseArray(allName);
 
                     jsonNodes.forEach(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java
new file mode 100644
index 0000000000..0117325913
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/RestApiHttpBasicTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.HttpConfig;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.internal.serialization.Data;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Base64.Encoder;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS;
+import static 
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW;
+
+/** Test for Rest API with Basic. */
+class RestApiHttpBasicTest extends AbstractSeaTunnelServerTest {
+
+    private static final int HTTP_PORT = 18081;
+    private static final Long JOB_1 = System.currentTimeMillis() + 1L;
+    private static final String USER = "admin";
+    private static final String PASS = "admin";
+    private static final String DOMAIN = "http://localhost:"; + HTTP_PORT;
+
+    private static final String AUTHORIZATION_HEADER = "Authorization";
+    private static final String BASIC_PREFIX = "Basic ";
+
+    @BeforeAll
+    void setUp() {
+        String name = this.getClass().getName();
+        Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
+        hazelcastConfig.setClusterName(
+                TestUtils.getClusterName("RestApiServletHttpBasicTest_" + 
name));
+        SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
+        seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+        seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);
+
+        HttpConfig httpConfig = 
seaTunnelConfig.getEngineConfig().getHttpConfig();
+        httpConfig.setEnabled(Boolean.TRUE);
+        httpConfig.setPort(HTTP_PORT);
+
+        httpConfig.setEnableBasicAuth(Boolean.TRUE);
+        httpConfig.setBasicAuthUsername(USER);
+        httpConfig.setBasicAuthPassword(PASS);
+
+        instance = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+        nodeEngine = instance.node.nodeEngine;
+        server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME);
+        LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
+    }
+
+    @AfterAll
+    public void after() {
+        // Disable basic auth
+        // Because of the ConfigProvider.locateAndGetSeaTunnelConfig() 
single-case,
+        // if you change, other use cases will also change
+        // managed via 
org.apache.seatunnel.engine.common.config.YamlSeaTunnelDomConfigProcessor
+        SeaTunnelConfig seaTunnelConfig = loadSeaTunnelConfig();
+        HttpConfig httpConfig = 
seaTunnelConfig.getEngineConfig().getHttpConfig();
+        httpConfig.setEnableBasicAuth(Boolean.FALSE);
+        httpConfig.setBasicAuthUsername("");
+        httpConfig.setBasicAuthPassword("");
+    }
+
+    @Test
+    public void testRestApiOverview() throws Exception {
+        HttpURLConnection conn = null;
+        try {
+            URL url = new URL(DOMAIN + REST_URL_OVERVIEW);
+            conn = (HttpURLConnection) url.openConnection();
+            setBasicAuth(conn);
+
+            Assertions.assertEquals(200, conn.getResponseCode());
+            Assertions.assertTrue(
+                    conn.getHeaderFields()
+                            .get("Content-Type")
+                            .toString()
+                            .contains("charset=utf-8"));
+        } finally {
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+    }
+
+    @Test
+    void testLogRestApiResponseFailure() throws IOException {
+        startJob();
+        HttpURLConnection conn = null;
+        try {
+            URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=JSON");
+            conn = (HttpURLConnection) url.openConnection();
+
+            Assertions.assertEquals(401, conn.getResponseCode());
+        } finally {
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+    }
+
+    @Test
+    void testLogRestApiResponseSuccess() throws IOException {
+        startJob();
+        testLogRestApiResponse("JSON");
+    }
+
+    public void setBasicAuth(HttpURLConnection connection) {
+        // Basic Auth
+        Encoder encoder = Base64.getEncoder();
+        String auth = USER + ":" + PASS;
+        String token = 
encoder.encodeToString(auth.getBytes(StandardCharsets.UTF_8));
+        connection.setRequestProperty(AUTHORIZATION_HEADER, BASIC_PREFIX + 
token);
+    }
+
+    public void testLogRestApiResponse(String format) throws IOException {
+        HttpURLConnection conn = null;
+        try {
+            URL url = new URL(DOMAIN + REST_URL_LOGS + "?format=" + format);
+            conn = (HttpURLConnection) url.openConnection();
+            setBasicAuth(conn);
+
+            Assertions.assertEquals(200, conn.getResponseCode());
+            Assertions.assertTrue(
+                    conn.getHeaderFields()
+                            .get("Content-Type")
+                            .toString()
+                            .contains("charset=utf-8"));
+
+            try (BufferedReader in =
+                    new BufferedReader(new 
InputStreamReader(conn.getInputStream()))) {
+                // [ {
+                //  "node" : "localhost:18080",
+                //  "logLink" : 
"http://localhost:18080/logs/job-1760939539658.log";,
+                //  "logName" : "job-1760939539658.log"
+                // }, {
+                //  "node" : "localhost:18080",
+                //  "logLink" : 
"http://localhost:18080/logs/job-${ctx:ST-JID}.log";,
+                //  "logName" : "job-${ctx:ST-JID}.log"
+                // } ]
+                String response = in.lines().collect(Collectors.joining());
+                Assertions.assertFalse(StringUtils.isBlank(response));
+            }
+
+        } finally {
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+    }
+
+    private void startJob() {
+        LogicalDag testLogicalDag =
+                TestUtils.createTestLogicalPlan(
+                        "fake_to_console.conf",
+                        RestApiHttpBasicTest.JOB_1.toString(),
+                        RestApiHttpBasicTest.JOB_1);
+
+        JobImmutableInformation jobImmutableInformation =
+                new JobImmutableInformation(
+                        RestApiHttpBasicTest.JOB_1,
+                        "Test",
+                        nodeEngine.getSerializationService(),
+                        testLogicalDag,
+                        Collections.emptyList(),
+                        Collections.emptyList());
+
+        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+                server.getCoordinatorService()
+                        .submitJob(
+                                RestApiHttpBasicTest.JOB_1,
+                                data,
+                                
jobImmutableInformation.isStartWithSavePoint());
+        voidPassiveCompletableFuture.join();
+    }
+}

Reply via email to