Copilot commented on code in PR #60988:
URL: https://github.com/apache/doris/pull/60988#discussion_r2881532774


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages small files (e.g. SSL certificates) referenced by 
FILE:{file_id}:{md5}.
+ *
+ * <p>Files are fetched from FE via HTTP on first access, then cached on disk 
as {file_id}.{md5} and
+ * in memory to avoid repeated I/O on subsequent calls.
+ */
+public class SmallFileMgr {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SmallFileMgr.class);
+
+    private static final String FILE_PREFIX = "FILE:";
+
+    /** In-memory cache: "file_id:md5" -> absolute local file path */
+    private static final Map<String, String> MEM_CACHE = new 
ConcurrentHashMap<>();
+
+    /**
+     * Per-key locks to serialize concurrent downloads of the same file, 
preventing tmp file
+     * corruption when multiple threads race on the same file_id:md5 key.
+     */
+    private static final Map<String, Object> DOWNLOAD_LOCKS = new 
ConcurrentHashMap<>();
+
+    private SmallFileMgr() {}
+
+    /**
+     * Resolve a FILE: reference to an absolute local file path, downloading 
from FE if needed. FE
+     * address and cluster token are read from {@link Env}.
+     *
+     * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+     * @return absolute local file path
+     */
+    public static String getFilePath(String filePath) {
+        return getFilePath(
+                Env.getCurrentEnv().getFeMasterAddress(),
+                filePath,
+                Env.getCurrentEnv().getClusterToken(),
+                getLocalDir());
+    }
+
+    /**
+     * Get the directory of the currently running JAR file
+     *
+     * @return
+     */
+    static String getLocalDir() {
+        try {
+            URL url = 
SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation();
+            LOG.info("Get code source URL: {}", url);
+            // Spring Boot fat jar: 
jar:file:/path/to/app.jar!/BOOT-INF/classes!/
+            if ("jar".equals(url.getProtocol())) {
+                String path = url.getPath(); // 
file:/path/to/app.jar!/BOOT-INF/classes!/
+                int separator = path.indexOf("!");
+                if (separator > 0) {
+                    path = path.substring(0, separator); // 
file:/path/to/app.jar
+                }
+                url = new URL(path);
+            }
+            File file = new File(url.toURI());
+            // When running a JAR file, `file` refers to the JAR file itself, 
taking its parent
+            // directory.
+            // When running an IDE file, `file` refers to the classes 
directory, returning directly.
+            return file.isFile() ? file.getParentFile().getAbsolutePath() : 
file.getAbsolutePath();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /** Package-private overload that accepts a custom local directory, used 
for testing. */
+    static String getFilePath(
+            String feMasterAddress, String filePath, String clusterToken, 
String localDir) {
+        if (!filePath.startsWith(FILE_PREFIX)) {
+            throw new IllegalArgumentException("filePath must start with 
FILE:, got: " + filePath);
+        }
+        if (feMasterAddress == null || feMasterAddress.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "feMasterAddress is required when filePath is a FILE: 
reference");
+        }
+        String[] parts = filePath.substring(FILE_PREFIX.length()).split(":");
+        if (parts.length != 2) {
+            throw new IllegalArgumentException(
+                    "Invalid filePath format, expected FILE:file_id:md5, got: 
" + filePath);
+        }
+        String fileId = parts[0];
+        String md5 = parts[1];
+        String cacheKey = fileId + ":" + md5;
+
+        // 1. Fast path: in-memory cache hit — zero I/O, no lock needed
+        String memCached = MEM_CACHE.get(cacheKey);
+        if (memCached != null) {
+            LOG.debug("SmallFile memory cache hit: {}", memCached);
+            return memCached;
+        }
+
+        // 2. Serialize concurrent downloads of the same file to prevent tmp 
file corruption
+        Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new 
Object());
+        synchronized (lock) {
+            // Double-check memory cache inside the lock
+            String doubleChecked = MEM_CACHE.get(cacheKey);
+            if (doubleChecked != null) {
+                LOG.debug("SmallFile memory cache hit (after lock): {}", 
doubleChecked);
+                return doubleChecked;
+            }
+
+            String finalFilePath = localDir + File.separator + fileId + "." + 
md5;
+            File finalFile = new File(finalFilePath);
+
+            // 3. Disk cache hit — avoid downloading again after process 
restart
+            if (finalFile.exists()) {
+                try (FileInputStream fis = new FileInputStream(finalFile)) {
+                    String diskMd5 = DigestUtils.md5Hex(fis);
+                    if (diskMd5.equalsIgnoreCase(md5)) {
+                        LOG.info("SmallFile disk cache hit: {}", 
finalFilePath);
+                        MEM_CACHE.put(cacheKey, finalFilePath);
+                        return finalFilePath;
+                    }
+                    LOG.warn(
+                            "SmallFile disk cache MD5 mismatch, 
re-downloading: {}", finalFilePath);
+                } catch (IOException e) {
+                    LOG.warn(
+                            "Failed to read disk cached file, re-downloading: 
{}",
+                            finalFilePath,
+                            e);
+                }
+                finalFile.delete();
+            }
+
+            // 4. Download from FE: GET 
/api/get_small_file?file_id=xxx&token=yyy
+            String url =
+                    "http://";
+                            + feMasterAddress
+                            + "/api/get_small_file?file_id="
+                            + fileId
+                            + "&token="
+                            + clusterToken;
+            LOG.info("Downloading small file from FE: {}", url);
+
+            File tmpFile = new File(localDir + File.separator + fileId + 
".tmp");

Review Comment:
   `tmpFile` is derived only from `fileId` (`{fileId}.tmp`), but the download 
lock key is `fileId:md5`. If two different MD5 versions of the same `fileId` 
are fetched concurrently, they will use different locks and write to the same 
tmp file, causing corruption. Use a tmp file name that includes the full cache 
key (e.g., `fileId.md5.tmp`) or broaden the lock to be per-`fileId`.
   ```suggestion
               File tmpFile =
                       new File(localDir + File.separator + fileId + "." + md5 
+ ".tmp");
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages small files (e.g. SSL certificates) referenced by 
FILE:{file_id}:{md5}.
+ *
+ * <p>Files are fetched from FE via HTTP on first access, then cached on disk 
as {file_id}.{md5} and
+ * in memory to avoid repeated I/O on subsequent calls.
+ */
+public class SmallFileMgr {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SmallFileMgr.class);
+
+    private static final String FILE_PREFIX = "FILE:";
+
+    /** In-memory cache: "file_id:md5" -> absolute local file path */
+    private static final Map<String, String> MEM_CACHE = new 
ConcurrentHashMap<>();
+
+    /**
+     * Per-key locks to serialize concurrent downloads of the same file, 
preventing tmp file
+     * corruption when multiple threads race on the same file_id:md5 key.
+     */
+    private static final Map<String, Object> DOWNLOAD_LOCKS = new 
ConcurrentHashMap<>();
+
+    private SmallFileMgr() {}
+
+    /**
+     * Resolve a FILE: reference to an absolute local file path, downloading 
from FE if needed. FE
+     * address and cluster token are read from {@link Env}.
+     *
+     * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+     * @return absolute local file path
+     */
+    public static String getFilePath(String filePath) {
+        return getFilePath(
+                Env.getCurrentEnv().getFeMasterAddress(),
+                filePath,
+                Env.getCurrentEnv().getClusterToken(),
+                getLocalDir());
+    }
+
+    /**
+     * Get the directory of the currently running JAR file
+     *
+     * @return
+     */
+    static String getLocalDir() {
+        try {
+            URL url = 
SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation();
+            LOG.info("Get code source URL: {}", url);
+            // Spring Boot fat jar: 
jar:file:/path/to/app.jar!/BOOT-INF/classes!/
+            if ("jar".equals(url.getProtocol())) {
+                String path = url.getPath(); // 
file:/path/to/app.jar!/BOOT-INF/classes!/
+                int separator = path.indexOf("!");
+                if (separator > 0) {
+                    path = path.substring(0, separator); // 
file:/path/to/app.jar
+                }
+                url = new URL(path);
+            }
+            File file = new File(url.toURI());
+            // When running a JAR file, `file` refers to the JAR file itself, 
taking its parent
+            // directory.
+            // When running an IDE file, `file` refers to the classes 
directory, returning directly.
+            return file.isFile() ? file.getParentFile().getAbsolutePath() : 
file.getAbsolutePath();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /** Package-private overload that accepts a custom local directory, used 
for testing. */
+    static String getFilePath(
+            String feMasterAddress, String filePath, String clusterToken, 
String localDir) {
+        if (!filePath.startsWith(FILE_PREFIX)) {
+            throw new IllegalArgumentException("filePath must start with 
FILE:, got: " + filePath);
+        }
+        if (feMasterAddress == null || feMasterAddress.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "feMasterAddress is required when filePath is a FILE: 
reference");
+        }
+        String[] parts = filePath.substring(FILE_PREFIX.length()).split(":");
+        if (parts.length != 2) {
+            throw new IllegalArgumentException(
+                    "Invalid filePath format, expected FILE:file_id:md5, got: 
" + filePath);
+        }
+        String fileId = parts[0];
+        String md5 = parts[1];
+        String cacheKey = fileId + ":" + md5;
+
+        // 1. Fast path: in-memory cache hit — zero I/O, no lock needed
+        String memCached = MEM_CACHE.get(cacheKey);
+        if (memCached != null) {
+            LOG.debug("SmallFile memory cache hit: {}", memCached);
+            return memCached;
+        }
+
+        // 2. Serialize concurrent downloads of the same file to prevent tmp 
file corruption
+        Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new 
Object());
+        synchronized (lock) {

Review Comment:
   `DOWNLOAD_LOCKS` stores one lock object per unique `fileId:md5` and is never 
cleaned up unless `clearCache()` is called. Over time, this can grow without 
bound for long-running processes. Consider using a bounded/striped lock 
strategy, or remove the lock entry once the download completes (taking care not 
to remove a lock another thread is about to use).



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages small files (e.g. SSL certificates) referenced by 
FILE:{file_id}:{md5}.
+ *
+ * <p>Files are fetched from FE via HTTP on first access, then cached on disk 
as {file_id}.{md5} and
+ * in memory to avoid repeated I/O on subsequent calls.
+ */
+public class SmallFileMgr {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SmallFileMgr.class);
+
+    private static final String FILE_PREFIX = "FILE:";
+
+    /** In-memory cache: "file_id:md5" -> absolute local file path */
+    private static final Map<String, String> MEM_CACHE = new 
ConcurrentHashMap<>();
+
+    /**
+     * Per-key locks to serialize concurrent downloads of the same file, 
preventing tmp file
+     * corruption when multiple threads race on the same file_id:md5 key.
+     */
+    private static final Map<String, Object> DOWNLOAD_LOCKS = new 
ConcurrentHashMap<>();
+
+    private SmallFileMgr() {}
+
+    /**
+     * Resolve a FILE: reference to an absolute local file path, downloading 
from FE if needed. FE
+     * address and cluster token are read from {@link Env}.
+     *
+     * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+     * @return absolute local file path
+     */
+    public static String getFilePath(String filePath) {
+        return getFilePath(
+                Env.getCurrentEnv().getFeMasterAddress(),
+                filePath,
+                Env.getCurrentEnv().getClusterToken(),
+                getLocalDir());
+    }
+
+    /**
+     * Get the directory of the currently running JAR file
+     *
+     * @return
+     */
+    static String getLocalDir() {
+        try {
+            URL url = 
SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation();
+            LOG.info("Get code source URL: {}", url);
+            // Spring Boot fat jar: 
jar:file:/path/to/app.jar!/BOOT-INF/classes!/
+            if ("jar".equals(url.getProtocol())) {
+                String path = url.getPath(); // 
file:/path/to/app.jar!/BOOT-INF/classes!/
+                int separator = path.indexOf("!");
+                if (separator > 0) {
+                    path = path.substring(0, separator); // 
file:/path/to/app.jar
+                }
+                url = new URL(path);
+            }
+            File file = new File(url.toURI());
+            // When running a JAR file, `file` refers to the JAR file itself, 
taking its parent
+            // directory.
+            // When running an IDE file, `file` refers to the classes 
directory, returning directly.
+            return file.isFile() ? file.getParentFile().getAbsolutePath() : 
file.getAbsolutePath();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /** Package-private overload that accepts a custom local directory, used 
for testing. */
+    static String getFilePath(
+            String feMasterAddress, String filePath, String clusterToken, 
String localDir) {
+        if (!filePath.startsWith(FILE_PREFIX)) {
+            throw new IllegalArgumentException("filePath must start with 
FILE:, got: " + filePath);
+        }
+        if (feMasterAddress == null || feMasterAddress.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "feMasterAddress is required when filePath is a FILE: 
reference");
+        }
+        String[] parts = filePath.substring(FILE_PREFIX.length()).split(":");
+        if (parts.length != 2) {
+            throw new IllegalArgumentException(
+                    "Invalid filePath format, expected FILE:file_id:md5, got: 
" + filePath);
+        }
+        String fileId = parts[0];
+        String md5 = parts[1];
+        String cacheKey = fileId + ":" + md5;
+
+        // 1. Fast path: in-memory cache hit — zero I/O, no lock needed
+        String memCached = MEM_CACHE.get(cacheKey);
+        if (memCached != null) {
+            LOG.debug("SmallFile memory cache hit: {}", memCached);
+            return memCached;
+        }
+
+        // 2. Serialize concurrent downloads of the same file to prevent tmp 
file corruption
+        Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new 
Object());
+        synchronized (lock) {
+            // Double-check memory cache inside the lock
+            String doubleChecked = MEM_CACHE.get(cacheKey);
+            if (doubleChecked != null) {
+                LOG.debug("SmallFile memory cache hit (after lock): {}", 
doubleChecked);
+                return doubleChecked;
+            }
+
+            String finalFilePath = localDir + File.separator + fileId + "." + 
md5;
+            File finalFile = new File(finalFilePath);
+
+            // 3. Disk cache hit — avoid downloading again after process 
restart
+            if (finalFile.exists()) {
+                try (FileInputStream fis = new FileInputStream(finalFile)) {
+                    String diskMd5 = DigestUtils.md5Hex(fis);
+                    if (diskMd5.equalsIgnoreCase(md5)) {
+                        LOG.info("SmallFile disk cache hit: {}", 
finalFilePath);
+                        MEM_CACHE.put(cacheKey, finalFilePath);
+                        return finalFilePath;
+                    }
+                    LOG.warn(
+                            "SmallFile disk cache MD5 mismatch, 
re-downloading: {}", finalFilePath);
+                } catch (IOException e) {
+                    LOG.warn(
+                            "Failed to read disk cached file, re-downloading: 
{}",
+                            finalFilePath,
+                            e);
+                }
+                finalFile.delete();
+            }
+
+            // 4. Download from FE: GET 
/api/get_small_file?file_id=xxx&token=yyy
+            String url =
+                    "http://";
+                            + feMasterAddress
+                            + "/api/get_small_file?file_id="
+                            + fileId
+                            + "&token="
+                            + clusterToken;
+            LOG.info("Downloading small file from FE: {}", url);
+

Review Comment:
   The log line prints the full download URL including the `clusterToken` query 
parameter. This can leak the token into logs and log aggregation systems. 
Consider redacting the token in logs and/or sending the token via an HTTP 
header instead of a query parameter (if the FE endpoint supports it).



##########
fs_brokers/cdc_client/src/main/resources/application.properties:
##########
@@ -17,4 +17,5 @@
 
################################################################################
 spring.web.resources.add-mappings=false
 server.port=9096
-backend.http.port=8040
\ No newline at end of file
+backend.http.port=8040
+cluster.token=cluster-token

Review Comment:
   A hard-coded default `cluster.token` in the packaged 
`application.properties` can lead to confusing misconfiguration (cdc_client may 
start successfully but fail to authenticate to FE unless overridden). Consider 
omitting the default and requiring it be provided via command-line/env, or at 
least setting it to empty and failing fast if missing.
   ```suggestion
   # cluster.token must be provided via environment/command-line or external 
config
   cluster.token=
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -245,6 +249,31 @@ private static synchronized int 
getLastSelectedBackendIndexAndUpdate() {
         return index;
     }
 
+    /**
+     * When enabling SSL, you need to convert FILE:ca.perm to FILE:ca.pem:md5.

Review Comment:
   Typo in comment: `FILE:ca.perm` should be `FILE:ca.pem`.
   ```suggestion
        * When enabling SSL, you need to convert FILE:ca.pem to FILE:ca.pem:md5.
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy:
##########
@@ -179,8 +179,9 @@ suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,extern
                    {
                        def jobStatus = sql """ select status, ErrorMsg from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
                        log.info("jobStatus: " + jobStatus)
+                       println("jobStatus: " + jobStatus)
                        // check job status
-                       jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch 
meta")
+                       jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0)
                    }

Review Comment:
   This assertion no longer validates the expected failure reason (`ErrorMsg` 
content) when the job is PAUSED. That weakens the test and could allow 
unrelated failures to pass. Consider keeping an assertion on `ErrorMsg` (even 
if relaxed, e.g. non-empty or matching the new expected message) to ensure the 
PAUSE is for the intended privilege error path.



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy:
##########
@@ -179,8 +179,9 @@ suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,extern
                    {
                        def jobStatus = sql """ select status, ErrorMsg from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
                        log.info("jobStatus: " + jobStatus)
+                       println("jobStatus: " + jobStatus)

Review Comment:
   `println("jobStatus: ...")` introduces noisy stdout output in regression 
runs and duplicates `log.info` above. Prefer using the existing logger only.
   ```suggestion
   
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -245,6 +249,31 @@ private static synchronized int 
getLastSelectedBackendIndexAndUpdate() {
         return index;
     }
 
+    /**
+     * When enabling SSL, you need to convert FILE:ca.perm to FILE:ca.pem:md5.
+     */
+    public static Map<String, String> convertCertFile(long dbId, Map<String, 
String> sourceProperties)
+            throws JobException {
+        SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr();
+        Map<String, String> newProps = new HashMap<>(sourceProperties);
+        if (sourceProperties.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) {
+            String certFile = 
sourceProperties.get(DataSourceConfigKeys.SSL_ROOTCERT);
+            if (certFile.startsWith("FILE:")) {
+                String file = certFile.substring(certFile.indexOf(":") + 1);
+                try {
+                    SmallFile smallFile =
+                            smallFileMgr.getSmallFile(dbId, 
StreamingInsertJob.JOB_FILE_CATALOG, file, true);
+                    newProps.put(DataSourceConfigKeys.SSL_ROOTCERT, "FILE:" + 
smallFile.id + ":" + smallFile.md5);
+                } catch (DdlException ex) {
+                    throw new JobException("ssl root cert file not found: " + 
certFile);
+                }
+            } else {
+                throw new JobException("ssl root cert is not in expected 
format, should start with FILE:" + certFile);

Review Comment:
   The JobException messages here are hard to interpret and drop useful 
context: 
   - The `DdlException` cause is swallowed, making debugging harder.
   - The "should start with FILE:" message concatenates the expected prefix and 
the actual value without a separator.
   Consider including the original exception as the cause and formatting 
messages as "expected ..., got ...".
   ```suggestion
                       throw new JobException("ssl root cert file not found, 
expected existing small file reference, got '"
                               + certFile + "'", ex);
                   }
               } else {
                   throw new JobException("ssl root cert is not in expected 
format, expected value starting with 'FILE:', "
                           + "got '" + certFile + "'");
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.groovy:
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_postgres_job_ssl", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_postgres_job_name_ssl"
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_pg_normal1_ssl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${table1} force"""
+
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // create test
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+            def sslInfo = sql """SHOW ssl"""
+            log.info("sslInfo: " + sslInfo)
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "name" varchar(200),
+                  "age" int2,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('A1', 1);"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('B1', 2);"""
+        }
+
+        try {
+            sql """DROP FILE "ca.pem" FROM ${currentDb} PROPERTIES ("catalog" 
= "streaming_job")"""
+        } catch (Exception ignored) {
+            // ignore
+        }
+
+        sql """CREATE FILE "ca.pem"
+            IN ${currentDb}
+            PROPERTIES
+            (
+                "url" = 
"https://qa-build.oss-cn-beijing.aliyuncs.com/jianxu/root.crt";,
+                "catalog" = "streaming_job"
+            )

Review Comment:
   This regression test depends on downloading the root CA from an external URL 
(`qa-build.oss-cn-beijing.aliyuncs.com`). That makes the test fragile (network, 
availability, region) and can break offline/air-gapped CI. Prefer referencing a 
test fixture stored in-repo (or provisioned by the docker-compose PG setup) so 
the test is self-contained.



##########
docker/thirdparties/docker-compose/postgresql/certs/server.key:
##########
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s
+IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje
+Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV
+i1fyQzDuwolYuEHgL6hCU76ZKPgIqODuY1mOArPTbj8FbsUqEBphgYj0Y2aMiC+N
+INcY5fN3acz1cuhU0iRHyaAPr2XIcfOEZQB1cPFpOv7Tbpna4GueKntmMMgHrkn+
+qvFJzx4rVmFnNh0cGkuc/rvO5ZGLIoM0ROlFrVKhYKT3/34AbDKBFxcLuHUakxvj
+rBx7j9O3AgMBAAECggEABZ+8uxdWnQYl+4xlV5E0gmTx3dh8Qd351UfFsW0demDr
+lU1SI3I4I/Lelv8lyrLXZzjcwPfmezfec6RnF37p7ijSPgrIG2PLplCqJsy6BzK1
+ycH/yaYm6sIFSBqdF+ZO5QOaGOWZpA9lgsYHNVt/jdvJCq/50ZhJZO2fvfi9dr4I
+vLjcCX57t+V9n68zHCdw8pTw3eSvO34wv8FXXQyofYi6+swoV/NhGFS1xMlc2USO
+KQ0Do/Y8Dxr/5HawoiMTzO/o4M0Bdmb237fW4D0yVqaevjVWKe/wq2q3VZyBatB2
+XDMkL1ZaWiRsRZHoliiIh3K3gQ2jmtsMXjzv+IKdvQKBgQDgPsk7y5Ms5rjArL8g
+qCP2o8a/IvxzCwxcvK59nfmWFuFeJsxE3uvp89UriqC6yGD5yxAmjDKvHOFtV+CE
+KjCnMgt/jU6BpkaHzTRR8Gtt/RkILZTZiKoNdEgOTeBjHKCoOUoM7Dc78nW7Dp0F
+QoLdAe0g0pSRy5iFcWBiX7UP5QKBgQDOBBRfnaU6fICVH0SmqBoKVSCDm+saYMAW
+99mypm2xViP4VQOa1QjNRiEN9kllxD4I+S48kALSCpif+A/IE89bNgFNEOvTYbkW
++mvjoFLQtN79Tc8/G0CEi+WhRWWpY9WnMuzj1r/pAbC8uOEKvJ+tYfKmHZN5kvoC
+k0e2yMCDawKBgFi6Hw9sxkgO5m0+LMW0Ib62IK6CHlc6uOJ8uaH0fsvXM8b4HPzn
+I3tHQkJfMKeXH1/W7AYElQ1apQuJqMlClEujbo9CjxyXePLEy/3b3fYAHgZxWqMU
+Aw0dxGD8iVtN+Xd2a4lfcZ9jmRexeYmaPoNJ/tRs3eIuJ6QtLxDdg5vNAoGBAIqU
+C/BVZrN01Dl7Ev7XzMxufrSIyRixRAUvK20Urmy/eOqupQIdkxIhvlJZ/P1LiD8Y
+/pUWeg83uXrBrjvzt2OvbCie3UMPVSWzxacUTSC+ydCx6lqUxk1inVBiEgRjd3BE
+vTx1VBo0XOJVqmtCflZusH41HuKEj0/0KiU13OmJAoGAYkxy/U6uHHn6xB3KriID
+bZgfYRlLv1bD4AYiOcjFke3/4MZJ2U4t/x6uzEjQZd/0waSeE3YY/MfEXufdHM99
+ZUlAHwLhjLcY58HgkyMkw4sRaHYxTQdOuxcnmzX1+sHKxKXlYoboLgh8Qf9A4DcR
+HZde9n1uVLVtlBRTjjL5O84=
+-----END PRIVATE KEY-----

Review Comment:
   This file adds a PEM-encoded private key to the repository. Even if intended 
only for local docker testing, committing private keys is risky and often 
violates security policies. Prefer generating the key/cert at container startup 
(or during CI setup) and keeping only non-sensitive public certs in-repo.
   ```suggestion
   # Placeholder for PostgreSQL server private key.
   # 
   # A real private key MUST NOT be committed to version control.
   # Generate a fresh key and certificate at container startup or during CI,
   # and write the key to this path:
   #   docker/thirdparties/docker-compose/postgresql/certs/server.key
   #
   # Example (self-signed, for local testing only):
   #   openssl req -newkey rsa:4096 -nodes -keyout server.key -x509 -days 365 \
   #       -out server.crt -subj "/CN=postgres.local"
   #
   # This file intentionally contains no valid PEM private key material.
   DUMMY_PRIVATE_KEY_GENERATE_AT_RUNTIME
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to