github-actions[bot] commented on code in PR #60988:
URL: https://github.com/apache/doris/pull/60988#discussion_r2881522127


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages small files (e.g. SSL certificates) referenced by 
FILE:{file_id}:{md5}.
+ *
+ * <p>Files are fetched from FE via HTTP on first access, then cached on disk 
as {file_id}.{md5} and
+ * in memory to avoid repeated I/O on subsequent calls.
+ */
+public class SmallFileMgr {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SmallFileMgr.class);
+
+    private static final String FILE_PREFIX = "FILE:";
+
+    /** In-memory cache: "file_id:md5" -> absolute local file path */
+    private static final Map<String, String> MEM_CACHE = new 
ConcurrentHashMap<>();
+
+    /**
+     * Per-key locks to serialize concurrent downloads of the same file, 
preventing tmp file
+     * corruption when multiple threads race on the same file_id:md5 key.
+     */
+    private static final Map<String, Object> DOWNLOAD_LOCKS = new 
ConcurrentHashMap<>();
+
+    private SmallFileMgr() {}
+
+    /**
+     * Resolve a FILE: reference to an absolute local file path, downloading 
from FE if needed. FE
+     * address and cluster token are read from {@link Env}.
+     *
+     * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+     * @return absolute local file path
+     */
+    public static String getFilePath(String filePath) {
+        return getFilePath(
+                Env.getCurrentEnv().getFeMasterAddress(),
+                filePath,
+                Env.getCurrentEnv().getClusterToken(),
+                getLocalDir());
+    }
+
+    /**
+     * Get the directory of the currently running JAR file
+     *
+     * @return
+     */
+    static String getLocalDir() {
+        try {
+            URL url = 
SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation();
+            LOG.info("Get code source URL: {}", url);
+            // Spring Boot fat jar: 
jar:file:/path/to/app.jar!/BOOT-INF/classes!/
+            if ("jar".equals(url.getProtocol())) {
+                String path = url.getPath(); // 
file:/path/to/app.jar!/BOOT-INF/classes!/
+                int separator = path.indexOf("!");
+                if (separator > 0) {
+                    path = path.substring(0, separator); // 
file:/path/to/app.jar
+                }
+                url = new URL(path);
+            }
+            File file = new File(url.toURI());
+            // When running a JAR file, `file` refers to the JAR file itself, 
taking its parent
+            // directory.
+            // When running an IDE file, `file` refers to the classes 
directory, returning directly.
+            return file.isFile() ? file.getParentFile().getAbsolutePath() : 
file.getAbsolutePath();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /** Package-private overload that accepts a custom local directory, used 
for testing. */
+    static String getFilePath(
+            String feMasterAddress, String filePath, String clusterToken, 
String localDir) {
+        if (!filePath.startsWith(FILE_PREFIX)) {
+            throw new IllegalArgumentException("filePath must start with 
FILE:, got: " + filePath);
+        }
+        if (feMasterAddress == null || feMasterAddress.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "feMasterAddress is required when filePath is a FILE: 
reference");
+        }
+        String[] parts = filePath.substring(FILE_PREFIX.length()).split(":");
+        if (parts.length != 2) {
+            throw new IllegalArgumentException(
+                    "Invalid filePath format, expected FILE:file_id:md5, got: 
" + filePath);
+        }
+        String fileId = parts[0];
+        String md5 = parts[1];
+        String cacheKey = fileId + ":" + md5;
+
+        // 1. Fast path: in-memory cache hit — zero I/O, no lock needed
+        String memCached = MEM_CACHE.get(cacheKey);
+        if (memCached != null) {
+            LOG.debug("SmallFile memory cache hit: {}", memCached);
+            return memCached;
+        }
+
+        // 2. Serialize concurrent downloads of the same file to prevent tmp 
file corruption
+        Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new 
Object());
+        synchronized (lock) {
+            // Double-check memory cache inside the lock
+            String doubleChecked = MEM_CACHE.get(cacheKey);
+            if (doubleChecked != null) {
+                LOG.debug("SmallFile memory cache hit (after lock): {}", 
doubleChecked);
+                return doubleChecked;
+            }
+
+            String finalFilePath = localDir + File.separator + fileId + "." + 
md5;
+            File finalFile = new File(finalFilePath);
+
+            // 3. Disk cache hit — avoid downloading again after process 
restart
+            if (finalFile.exists()) {
+                try (FileInputStream fis = new FileInputStream(finalFile)) {
+                    String diskMd5 = DigestUtils.md5Hex(fis);
+                    if (diskMd5.equalsIgnoreCase(md5)) {
+                        LOG.info("SmallFile disk cache hit: {}", 
finalFilePath);
+                        MEM_CACHE.put(cacheKey, finalFilePath);
+                        return finalFilePath;
+                    }
+                    LOG.warn(
+                            "SmallFile disk cache MD5 mismatch, 
re-downloading: {}", finalFilePath);
+                } catch (IOException e) {
+                    LOG.warn(
+                            "Failed to read disk cached file, re-downloading: 
{}",
+                            finalFilePath,
+                            e);
+                }
+                finalFile.delete();
+            }
+
+            // 4. Download from FE: GET 
/api/get_small_file?file_id=xxx&token=yyy
+            String url =
+                    "http://";
+                            + feMasterAddress
+                            + "/api/get_small_file?file_id="
+                            + fileId

Review Comment:
   **Suggestion: Consider HTTPS support.** The URL is hardcoded to `http://`. 
If FE is configured with HTTPS, this will fail. Consider making the scheme 
configurable or attempting HTTPS first with a fallback, depending on the Doris 
deployment's security requirements.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages small files (e.g. SSL certificates) referenced by 
FILE:{file_id}:{md5}.
+ *
+ * <p>Files are fetched from FE via HTTP on first access, then cached on disk 
as {file_id}.{md5} and
+ * in memory to avoid repeated I/O on subsequent calls.
+ */
+public class SmallFileMgr {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SmallFileMgr.class);
+
+    private static final String FILE_PREFIX = "FILE:";
+
+    /** In-memory cache: "file_id:md5" -> absolute local file path */
+    private static final Map<String, String> MEM_CACHE = new 
ConcurrentHashMap<>();
+
+    /**
+     * Per-key locks to serialize concurrent downloads of the same file, 
preventing tmp file
+     * corruption when multiple threads race on the same file_id:md5 key.
+     */
+    private static final Map<String, Object> DOWNLOAD_LOCKS = new 
ConcurrentHashMap<>();
+
+    private SmallFileMgr() {}
+

Review Comment:
   **Minor: Unbounded DOWNLOAD_LOCKS map.** The `DOWNLOAD_LOCKS` map grows by 
one entry for every unique `file_id:md5` key but entries are never removed 
(only `clearCache()` clears everything). For long-running CDC client processes 
handling many SSL cert rotations, this would be a minor memory leak. Consider 
removing the lock entry after the download completes, or using a 
`WeakHashMap`-based approach.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java:
##########
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.cdcclient.common.Env;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages small files (e.g. SSL certificates) referenced by 
FILE:{file_id}:{md5}.
+ *
+ * <p>Files are fetched from FE via HTTP on first access, then cached on disk 
as {file_id}.{md5} and
+ * in memory to avoid repeated I/O on subsequent calls.
+ */
+public class SmallFileMgr {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SmallFileMgr.class);
+
+    private static final String FILE_PREFIX = "FILE:";
+
+    /** In-memory cache: "file_id:md5" -> absolute local file path */
+    private static final Map<String, String> MEM_CACHE = new 
ConcurrentHashMap<>();
+
+    /**
+     * Per-key locks to serialize concurrent downloads of the same file, 
preventing tmp file
+     * corruption when multiple threads race on the same file_id:md5 key.
+     */
+    private static final Map<String, Object> DOWNLOAD_LOCKS = new 
ConcurrentHashMap<>();
+
+    private SmallFileMgr() {}
+
+    /**
+     * Resolve a FILE: reference to an absolute local file path, downloading 
from FE if needed. FE
+     * address and cluster token are read from {@link Env}.
+     *
+     * @param filePath FILE reference, format: FILE:{file_id}:{md5}
+     * @return absolute local file path
+     */
+    public static String getFilePath(String filePath) {
+        return getFilePath(
+                Env.getCurrentEnv().getFeMasterAddress(),
+                filePath,
+                Env.getCurrentEnv().getClusterToken(),
+                getLocalDir());
+    }
+
+    /**
+     * Get the directory of the currently running JAR file
+     *
+     * @return
+     */
+    static String getLocalDir() {
+        try {
+            URL url = 
SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation();
+            LOG.info("Get code source URL: {}", url);
+            // Spring Boot fat jar: 
jar:file:/path/to/app.jar!/BOOT-INF/classes!/
+            if ("jar".equals(url.getProtocol())) {
+                String path = url.getPath(); // 
file:/path/to/app.jar!/BOOT-INF/classes!/
+                int separator = path.indexOf("!");
+                if (separator > 0) {
+                    path = path.substring(0, separator); // 
file:/path/to/app.jar
+                }
+                url = new URL(path);
+            }
+            File file = new File(url.toURI());
+            // When running a JAR file, `file` refers to the JAR file itself, 
taking its parent
+            // directory.
+            // When running an IDE file, `file` refers to the classes 
directory, returning directly.
+            return file.isFile() ? file.getParentFile().getAbsolutePath() : 
file.getAbsolutePath();
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /** Package-private overload that accepts a custom local directory, used 
for testing. */
+    static String getFilePath(
+            String feMasterAddress, String filePath, String clusterToken, 
String localDir) {
+        if (!filePath.startsWith(FILE_PREFIX)) {
+            throw new IllegalArgumentException("filePath must start with 
FILE:, got: " + filePath);
+        }
+        if (feMasterAddress == null || feMasterAddress.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "feMasterAddress is required when filePath is a FILE: 
reference");
+        }
+        String[] parts = filePath.substring(FILE_PREFIX.length()).split(":");
+        if (parts.length != 2) {
+            throw new IllegalArgumentException(
+                    "Invalid filePath format, expected FILE:file_id:md5, got: 
" + filePath);
+        }
+        String fileId = parts[0];
+        String md5 = parts[1];
+        String cacheKey = fileId + ":" + md5;
+
+        // 1. Fast path: in-memory cache hit — zero I/O, no lock needed
+        String memCached = MEM_CACHE.get(cacheKey);
+        if (memCached != null) {
+            LOG.debug("SmallFile memory cache hit: {}", memCached);
+            return memCached;
+        }
+
+        // 2. Serialize concurrent downloads of the same file to prevent tmp 
file corruption
+        Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new 
Object());
+        synchronized (lock) {
+            // Double-check memory cache inside the lock
+            String doubleChecked = MEM_CACHE.get(cacheKey);
+            if (doubleChecked != null) {
+                LOG.debug("SmallFile memory cache hit (after lock): {}", 
doubleChecked);
+                return doubleChecked;
+            }
+
+            String finalFilePath = localDir + File.separator + fileId + "." + 
md5;
+            File finalFile = new File(finalFilePath);
+
+            // 3. Disk cache hit — avoid downloading again after process 
restart
+            if (finalFile.exists()) {
+                try (FileInputStream fis = new FileInputStream(finalFile)) {
+                    String diskMd5 = DigestUtils.md5Hex(fis);
+                    if (diskMd5.equalsIgnoreCase(md5)) {
+                        LOG.info("SmallFile disk cache hit: {}", 
finalFilePath);
+                        MEM_CACHE.put(cacheKey, finalFilePath);
+                        return finalFilePath;
+                    }
+                    LOG.warn(
+                            "SmallFile disk cache MD5 mismatch, 
re-downloading: {}", finalFilePath);
+                } catch (IOException e) {
+                    LOG.warn(
+                            "Failed to read disk cached file, re-downloading: 
{}",
+                            finalFilePath,
+                            e);
+                }
+                finalFile.delete();
+            }
+
+            // 4. Download from FE: GET 
/api/get_small_file?file_id=xxx&token=yyy
+            String url =
+                    "http://";
+                            + feMasterAddress
+                            + "/api/get_small_file?file_id="
+                            + fileId
+                            + "&token="
+                            + clusterToken;
+            LOG.info("Downloading small file from FE: {}", url);
+
+            File tmpFile = new File(localDir + File.separator + fileId + 
".tmp");
+            try (CloseableHttpClient client = HttpUtil.getHttpClient();

Review Comment:
   **Security: Cluster token logged in URL.** This `LOG.info` statement will 
write the full URL including `token=<cluster_token>` to the log file. The 
cluster token is a sensitive credential that should not appear in logs. 
Consider redacting the token from the URL before logging, e.g.:
   ```java
   LOG.info("Downloading small file from FE: {}, file_id={}", feMasterAddress, 
fileId);
   ```



##########
docker/thirdparties/docker-compose/postgresql/certs/server.key:
##########
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s
+IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje
+Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV

Review Comment:
   **Note: Test private key committed to repository.** This is a private key 
committed in plain text. While it appears to be for Docker test infrastructure 
only (`Local-Dev-CA` for `localhost`), consider adding a comment or README in 
this directory clarifying these are test-only certificates, or better yet, 
generate them dynamically during Docker container startup.



##########
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy:
##########
@@ -179,8 +179,9 @@ suite("test_streaming_mysql_job_priv", 
"p0,external,mysql,external_docker,extern
                    {
                        def jobStatus = sql """ select status, ErrorMsg from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
                        log.info("jobStatus: " + jobStatus)
+                       println("jobStatus: " + jobStatus)
                        // check job status
-                       jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch 
meta")
+                       jobStatus.size() == 1 && 'PAUSED' == 
jobStatus.get(0).get(0)

Review Comment:
   **Concern: Relaxed assertion.** The previous check verified a specific error 
message (`contains("Failed to fetch meta")`), while the new check only verifies 
`PAUSED` status. This broader assertion could mask regressions where the job 
pauses for a different/unexpected reason. Was this intentional? If the error 
message changed, the assertion should be updated to match the new message 
rather than removed entirely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to