github-actions[bot] commented on code in PR #60988: URL: https://github.com/apache/doris/pull/60988#discussion_r2881522127
########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java: ########## @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.Env; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages small files (e.g. SSL certificates) referenced by FILE:{file_id}:{md5}. + * + * <p>Files are fetched from FE via HTTP on first access, then cached on disk as {file_id}.{md5} and + * in memory to avoid repeated I/O on subsequent calls. + */ +public class SmallFileMgr { + private static final Logger LOG = LoggerFactory.getLogger(SmallFileMgr.class); + + private static final String FILE_PREFIX = "FILE:"; + + /** In-memory cache: "file_id:md5" -> absolute local file path */ + private static final Map<String, String> MEM_CACHE = new ConcurrentHashMap<>(); + + /** + * Per-key locks to serialize concurrent downloads of the same file, preventing tmp file + * corruption when multiple threads race on the same file_id:md5 key. + */ + private static final Map<String, Object> DOWNLOAD_LOCKS = new ConcurrentHashMap<>(); + + private SmallFileMgr() {} + + /** + * Resolve a FILE: reference to an absolute local file path, downloading from FE if needed. FE + * address and cluster token are read from {@link Env}. + * + * @param filePath FILE reference, format: FILE:{file_id}:{md5} + * @return absolute local file path + */ + public static String getFilePath(String filePath) { + return getFilePath( + Env.getCurrentEnv().getFeMasterAddress(), + filePath, + Env.getCurrentEnv().getClusterToken(), + getLocalDir()); + } + + /** + * Get the directory of the currently running JAR file + * + * @return + */ + static String getLocalDir() { + try { + URL url = SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation(); + LOG.info("Get code source URL: {}", url); + // Spring Boot fat jar: jar:file:/path/to/app.jar!/BOOT-INF/classes!/ + if ("jar".equals(url.getProtocol())) { + String path = url.getPath(); // file:/path/to/app.jar!/BOOT-INF/classes!/ + int separator = path.indexOf("!"); + if (separator > 0) { + path = path.substring(0, separator); // file:/path/to/app.jar + } + url = new URL(path); + } + File file = new File(url.toURI()); + // When running a JAR file, `file` refers to the JAR file itself, taking its parent + // directory. + // When running an IDE file, `file` refers to the classes directory, returning directly. + return file.isFile() ? file.getParentFile().getAbsolutePath() : file.getAbsolutePath(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** Package-private overload that accepts a custom local directory, used for testing. */ + static String getFilePath( + String feMasterAddress, String filePath, String clusterToken, String localDir) { + if (!filePath.startsWith(FILE_PREFIX)) { + throw new IllegalArgumentException("filePath must start with FILE:, got: " + filePath); + } + if (feMasterAddress == null || feMasterAddress.isEmpty()) { + throw new IllegalArgumentException( + "feMasterAddress is required when filePath is a FILE: reference"); + } + String[] parts = filePath.substring(FILE_PREFIX.length()).split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid filePath format, expected FILE:file_id:md5, got: " + filePath); + } + String fileId = parts[0]; + String md5 = parts[1]; + String cacheKey = fileId + ":" + md5; + + // 1. Fast path: in-memory cache hit — zero I/O, no lock needed + String memCached = MEM_CACHE.get(cacheKey); + if (memCached != null) { + LOG.debug("SmallFile memory cache hit: {}", memCached); + return memCached; + } + + // 2. Serialize concurrent downloads of the same file to prevent tmp file corruption + Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new Object()); + synchronized (lock) { + // Double-check memory cache inside the lock + String doubleChecked = MEM_CACHE.get(cacheKey); + if (doubleChecked != null) { + LOG.debug("SmallFile memory cache hit (after lock): {}", doubleChecked); + return doubleChecked; + } + + String finalFilePath = localDir + File.separator + fileId + "." + md5; + File finalFile = new File(finalFilePath); + + // 3. Disk cache hit — avoid downloading again after process restart + if (finalFile.exists()) { + try (FileInputStream fis = new FileInputStream(finalFile)) { + String diskMd5 = DigestUtils.md5Hex(fis); + if (diskMd5.equalsIgnoreCase(md5)) { + LOG.info("SmallFile disk cache hit: {}", finalFilePath); + MEM_CACHE.put(cacheKey, finalFilePath); + return finalFilePath; + } + LOG.warn( + "SmallFile disk cache MD5 mismatch, re-downloading: {}", finalFilePath); + } catch (IOException e) { + LOG.warn( + "Failed to read disk cached file, re-downloading: {}", + finalFilePath, + e); + } + finalFile.delete(); + } + + // 4. Download from FE: GET /api/get_small_file?file_id=xxx&token=yyy + String url = + "http://" + + feMasterAddress + + "/api/get_small_file?file_id=" + + fileId Review Comment: **Suggestion: Consider HTTPS support.** The URL is hardcoded to `http://`. If FE is configured with HTTPS, this will fail. Consider making the scheme configurable or attempting HTTPS first with a fallback, depending on the Doris deployment's security requirements. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java: ########## @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.Env; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages small files (e.g. SSL certificates) referenced by FILE:{file_id}:{md5}. + * + * <p>Files are fetched from FE via HTTP on first access, then cached on disk as {file_id}.{md5} and + * in memory to avoid repeated I/O on subsequent calls. + */ +public class SmallFileMgr { + private static final Logger LOG = LoggerFactory.getLogger(SmallFileMgr.class); + + private static final String FILE_PREFIX = "FILE:"; + + /** In-memory cache: "file_id:md5" -> absolute local file path */ + private static final Map<String, String> MEM_CACHE = new ConcurrentHashMap<>(); + + /** + * Per-key locks to serialize concurrent downloads of the same file, preventing tmp file + * corruption when multiple threads race on the same file_id:md5 key. + */ + private static final Map<String, Object> DOWNLOAD_LOCKS = new ConcurrentHashMap<>(); + + private SmallFileMgr() {} + Review Comment: **Minor: Unbounded DOWNLOAD_LOCKS map.** The `DOWNLOAD_LOCKS` map grows by one entry for every unique `file_id:md5` key but entries are never removed (only `clearCache()` clears everything). For long-running CDC client processes handling many SSL cert rotations, this would be a minor memory leak. Consider removing the lock entry after the download completes, or using a `WeakHashMap`-based approach. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java: ########## @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.Env; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages small files (e.g. SSL certificates) referenced by FILE:{file_id}:{md5}. + * + * <p>Files are fetched from FE via HTTP on first access, then cached on disk as {file_id}.{md5} and + * in memory to avoid repeated I/O on subsequent calls. + */ +public class SmallFileMgr { + private static final Logger LOG = LoggerFactory.getLogger(SmallFileMgr.class); + + private static final String FILE_PREFIX = "FILE:"; + + /** In-memory cache: "file_id:md5" -> absolute local file path */ + private static final Map<String, String> MEM_CACHE = new ConcurrentHashMap<>(); + + /** + * Per-key locks to serialize concurrent downloads of the same file, preventing tmp file + * corruption when multiple threads race on the same file_id:md5 key. + */ + private static final Map<String, Object> DOWNLOAD_LOCKS = new ConcurrentHashMap<>(); + + private SmallFileMgr() {} + + /** + * Resolve a FILE: reference to an absolute local file path, downloading from FE if needed. FE + * address and cluster token are read from {@link Env}. + * + * @param filePath FILE reference, format: FILE:{file_id}:{md5} + * @return absolute local file path + */ + public static String getFilePath(String filePath) { + return getFilePath( + Env.getCurrentEnv().getFeMasterAddress(), + filePath, + Env.getCurrentEnv().getClusterToken(), + getLocalDir()); + } + + /** + * Get the directory of the currently running JAR file + * + * @return + */ + static String getLocalDir() { + try { + URL url = SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation(); + LOG.info("Get code source URL: {}", url); + // Spring Boot fat jar: jar:file:/path/to/app.jar!/BOOT-INF/classes!/ + if ("jar".equals(url.getProtocol())) { + String path = url.getPath(); // file:/path/to/app.jar!/BOOT-INF/classes!/ + int separator = path.indexOf("!"); + if (separator > 0) { + path = path.substring(0, separator); // file:/path/to/app.jar + } + url = new URL(path); + } + File file = new File(url.toURI()); + // When running a JAR file, `file` refers to the JAR file itself, taking its parent + // directory. + // When running an IDE file, `file` refers to the classes directory, returning directly. + return file.isFile() ? file.getParentFile().getAbsolutePath() : file.getAbsolutePath(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** Package-private overload that accepts a custom local directory, used for testing. */ + static String getFilePath( + String feMasterAddress, String filePath, String clusterToken, String localDir) { + if (!filePath.startsWith(FILE_PREFIX)) { + throw new IllegalArgumentException("filePath must start with FILE:, got: " + filePath); + } + if (feMasterAddress == null || feMasterAddress.isEmpty()) { + throw new IllegalArgumentException( + "feMasterAddress is required when filePath is a FILE: reference"); + } + String[] parts = filePath.substring(FILE_PREFIX.length()).split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid filePath format, expected FILE:file_id:md5, got: " + filePath); + } + String fileId = parts[0]; + String md5 = parts[1]; + String cacheKey = fileId + ":" + md5; + + // 1. Fast path: in-memory cache hit — zero I/O, no lock needed + String memCached = MEM_CACHE.get(cacheKey); + if (memCached != null) { + LOG.debug("SmallFile memory cache hit: {}", memCached); + return memCached; + } + + // 2. Serialize concurrent downloads of the same file to prevent tmp file corruption + Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new Object()); + synchronized (lock) { + // Double-check memory cache inside the lock + String doubleChecked = MEM_CACHE.get(cacheKey); + if (doubleChecked != null) { + LOG.debug("SmallFile memory cache hit (after lock): {}", doubleChecked); + return doubleChecked; + } + + String finalFilePath = localDir + File.separator + fileId + "." + md5; + File finalFile = new File(finalFilePath); + + // 3. Disk cache hit — avoid downloading again after process restart + if (finalFile.exists()) { + try (FileInputStream fis = new FileInputStream(finalFile)) { + String diskMd5 = DigestUtils.md5Hex(fis); + if (diskMd5.equalsIgnoreCase(md5)) { + LOG.info("SmallFile disk cache hit: {}", finalFilePath); + MEM_CACHE.put(cacheKey, finalFilePath); + return finalFilePath; + } + LOG.warn( + "SmallFile disk cache MD5 mismatch, re-downloading: {}", finalFilePath); + } catch (IOException e) { + LOG.warn( + "Failed to read disk cached file, re-downloading: {}", + finalFilePath, + e); + } + finalFile.delete(); + } + + // 4. Download from FE: GET /api/get_small_file?file_id=xxx&token=yyy + String url = + "http://" + + feMasterAddress + + "/api/get_small_file?file_id=" + + fileId + + "&token=" + + clusterToken; + LOG.info("Downloading small file from FE: {}", url); + + File tmpFile = new File(localDir + File.separator + fileId + ".tmp"); + try (CloseableHttpClient client = HttpUtil.getHttpClient(); Review Comment: **Security: Cluster token logged in URL.** This `LOG.info` statement will write the full URL including `token=<cluster_token>` to the log file. The cluster token is a sensitive credential that should not appear in logs. Consider redacting the token from the URL before logging, e.g.: ```java LOG.info("Downloading small file from FE: {}, file_id={}", feMasterAddress, fileId); ``` ########## docker/thirdparties/docker-compose/postgresql/certs/server.key: ########## @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s +IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje +Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV Review Comment: **Note: Test private key committed to repository.** This is a private key committed in plain text. While it appears to be for Docker test infrastructure only (`Local-Dev-CA` for `localhost`), consider adding a comment or README in this directory clarifying these are test-only certificates, or better yet, generate them dynamically during Docker container startup. ########## regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy: ########## @@ -179,8 +179,9 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern { def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ log.info("jobStatus: " + jobStatus) + println("jobStatus: " + jobStatus) // check job status - jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") + jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) Review Comment: **Concern: Relaxed assertion.** The previous check verified a specific error message (`contains("Failed to fetch meta")`), while the new check only verifies `PAUSED` status. This broader assertion could mask regressions where the job pauses for a different/unexpected reason. Was this intentional? If the error message changed, the assertion should be updated to match the new message rather than removed entirely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
