Copilot commented on code in PR #60988: URL: https://github.com/apache/doris/pull/60988#discussion_r2881532774
########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java: ########## @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.Env; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages small files (e.g. SSL certificates) referenced by FILE:{file_id}:{md5}. + * + * <p>Files are fetched from FE via HTTP on first access, then cached on disk as {file_id}.{md5} and + * in memory to avoid repeated I/O on subsequent calls. + */ +public class SmallFileMgr { + private static final Logger LOG = LoggerFactory.getLogger(SmallFileMgr.class); + + private static final String FILE_PREFIX = "FILE:"; + + /** In-memory cache: "file_id:md5" -> absolute local file path */ + private static final Map<String, String> MEM_CACHE = new ConcurrentHashMap<>(); + + /** + * Per-key locks to serialize concurrent downloads of the same file, preventing tmp file + * corruption when multiple threads race on the same file_id:md5 key. + */ + private static final Map<String, Object> DOWNLOAD_LOCKS = new ConcurrentHashMap<>(); + + private SmallFileMgr() {} + + /** + * Resolve a FILE: reference to an absolute local file path, downloading from FE if needed. FE + * address and cluster token are read from {@link Env}. + * + * @param filePath FILE reference, format: FILE:{file_id}:{md5} + * @return absolute local file path + */ + public static String getFilePath(String filePath) { + return getFilePath( + Env.getCurrentEnv().getFeMasterAddress(), + filePath, + Env.getCurrentEnv().getClusterToken(), + getLocalDir()); + } + + /** + * Get the directory of the currently running JAR file + * + * @return + */ + static String getLocalDir() { + try { + URL url = SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation(); + LOG.info("Get code source URL: {}", url); + // Spring Boot fat jar: jar:file:/path/to/app.jar!/BOOT-INF/classes!/ + if ("jar".equals(url.getProtocol())) { + String path = url.getPath(); // file:/path/to/app.jar!/BOOT-INF/classes!/ + int separator = path.indexOf("!"); + if (separator > 0) { + path = path.substring(0, separator); // file:/path/to/app.jar + } + url = new URL(path); + } + File file = new File(url.toURI()); + // When running a JAR file, `file` refers to the JAR file itself, taking its parent + // directory. + // When running an IDE file, `file` refers to the classes directory, returning directly. + return file.isFile() ? file.getParentFile().getAbsolutePath() : file.getAbsolutePath(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** Package-private overload that accepts a custom local directory, used for testing. */ + static String getFilePath( + String feMasterAddress, String filePath, String clusterToken, String localDir) { + if (!filePath.startsWith(FILE_PREFIX)) { + throw new IllegalArgumentException("filePath must start with FILE:, got: " + filePath); + } + if (feMasterAddress == null || feMasterAddress.isEmpty()) { + throw new IllegalArgumentException( + "feMasterAddress is required when filePath is a FILE: reference"); + } + String[] parts = filePath.substring(FILE_PREFIX.length()).split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid filePath format, expected FILE:file_id:md5, got: " + filePath); + } + String fileId = parts[0]; + String md5 = parts[1]; + String cacheKey = fileId + ":" + md5; + + // 1. Fast path: in-memory cache hit — zero I/O, no lock needed + String memCached = MEM_CACHE.get(cacheKey); + if (memCached != null) { + LOG.debug("SmallFile memory cache hit: {}", memCached); + return memCached; + } + + // 2. Serialize concurrent downloads of the same file to prevent tmp file corruption + Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new Object()); + synchronized (lock) { + // Double-check memory cache inside the lock + String doubleChecked = MEM_CACHE.get(cacheKey); + if (doubleChecked != null) { + LOG.debug("SmallFile memory cache hit (after lock): {}", doubleChecked); + return doubleChecked; + } + + String finalFilePath = localDir + File.separator + fileId + "." + md5; + File finalFile = new File(finalFilePath); + + // 3. Disk cache hit — avoid downloading again after process restart + if (finalFile.exists()) { + try (FileInputStream fis = new FileInputStream(finalFile)) { + String diskMd5 = DigestUtils.md5Hex(fis); + if (diskMd5.equalsIgnoreCase(md5)) { + LOG.info("SmallFile disk cache hit: {}", finalFilePath); + MEM_CACHE.put(cacheKey, finalFilePath); + return finalFilePath; + } + LOG.warn( + "SmallFile disk cache MD5 mismatch, re-downloading: {}", finalFilePath); + } catch (IOException e) { + LOG.warn( + "Failed to read disk cached file, re-downloading: {}", + finalFilePath, + e); + } + finalFile.delete(); + } + + // 4. Download from FE: GET /api/get_small_file?file_id=xxx&token=yyy + String url = + "http://" + + feMasterAddress + + "/api/get_small_file?file_id=" + + fileId + + "&token=" + + clusterToken; + LOG.info("Downloading small file from FE: {}", url); + + File tmpFile = new File(localDir + File.separator + fileId + ".tmp"); Review Comment: `tmpFile` is derived only from `fileId` (`{fileId}.tmp`), but the download lock key is `fileId:md5`. If two different MD5 versions of the same `fileId` are fetched concurrently, they will use different locks and write to the same tmp file, causing corruption. Use a tmp file name that includes the full cache key (e.g., `fileId.md5.tmp`) or broaden the lock to be per-`fileId`. ```suggestion File tmpFile = new File(localDir + File.separator + fileId + "." + md5 + ".tmp"); ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java: ########## @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.Env; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages small files (e.g. SSL certificates) referenced by FILE:{file_id}:{md5}. + * + * <p>Files are fetched from FE via HTTP on first access, then cached on disk as {file_id}.{md5} and + * in memory to avoid repeated I/O on subsequent calls. + */ +public class SmallFileMgr { + private static final Logger LOG = LoggerFactory.getLogger(SmallFileMgr.class); + + private static final String FILE_PREFIX = "FILE:"; + + /** In-memory cache: "file_id:md5" -> absolute local file path */ + private static final Map<String, String> MEM_CACHE = new ConcurrentHashMap<>(); + + /** + * Per-key locks to serialize concurrent downloads of the same file, preventing tmp file + * corruption when multiple threads race on the same file_id:md5 key. + */ + private static final Map<String, Object> DOWNLOAD_LOCKS = new ConcurrentHashMap<>(); + + private SmallFileMgr() {} + + /** + * Resolve a FILE: reference to an absolute local file path, downloading from FE if needed. FE + * address and cluster token are read from {@link Env}. + * + * @param filePath FILE reference, format: FILE:{file_id}:{md5} + * @return absolute local file path + */ + public static String getFilePath(String filePath) { + return getFilePath( + Env.getCurrentEnv().getFeMasterAddress(), + filePath, + Env.getCurrentEnv().getClusterToken(), + getLocalDir()); + } + + /** + * Get the directory of the currently running JAR file + * + * @return + */ + static String getLocalDir() { + try { + URL url = SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation(); + LOG.info("Get code source URL: {}", url); + // Spring Boot fat jar: jar:file:/path/to/app.jar!/BOOT-INF/classes!/ + if ("jar".equals(url.getProtocol())) { + String path = url.getPath(); // file:/path/to/app.jar!/BOOT-INF/classes!/ + int separator = path.indexOf("!"); + if (separator > 0) { + path = path.substring(0, separator); // file:/path/to/app.jar + } + url = new URL(path); + } + File file = new File(url.toURI()); + // When running a JAR file, `file` refers to the JAR file itself, taking its parent + // directory. + // When running an IDE file, `file` refers to the classes directory, returning directly. + return file.isFile() ? file.getParentFile().getAbsolutePath() : file.getAbsolutePath(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** Package-private overload that accepts a custom local directory, used for testing. */ + static String getFilePath( + String feMasterAddress, String filePath, String clusterToken, String localDir) { + if (!filePath.startsWith(FILE_PREFIX)) { + throw new IllegalArgumentException("filePath must start with FILE:, got: " + filePath); + } + if (feMasterAddress == null || feMasterAddress.isEmpty()) { + throw new IllegalArgumentException( + "feMasterAddress is required when filePath is a FILE: reference"); + } + String[] parts = filePath.substring(FILE_PREFIX.length()).split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid filePath format, expected FILE:file_id:md5, got: " + filePath); + } + String fileId = parts[0]; + String md5 = parts[1]; + String cacheKey = fileId + ":" + md5; + + // 1. Fast path: in-memory cache hit — zero I/O, no lock needed + String memCached = MEM_CACHE.get(cacheKey); + if (memCached != null) { + LOG.debug("SmallFile memory cache hit: {}", memCached); + return memCached; + } + + // 2. Serialize concurrent downloads of the same file to prevent tmp file corruption + Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new Object()); + synchronized (lock) { Review Comment: `DOWNLOAD_LOCKS` stores one lock object per unique `fileId:md5` and is never cleaned up unless `clearCache()` is called. Over time, this can grow without bound for long-running processes. Consider using a bounded/striped lock strategy, or remove the lock entry once the download completes (taking care not to remove a lock another thread is about to use). ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SmallFileMgr.java: ########## @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.Env; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages small files (e.g. SSL certificates) referenced by FILE:{file_id}:{md5}. + * + * <p>Files are fetched from FE via HTTP on first access, then cached on disk as {file_id}.{md5} and + * in memory to avoid repeated I/O on subsequent calls. + */ +public class SmallFileMgr { + private static final Logger LOG = LoggerFactory.getLogger(SmallFileMgr.class); + + private static final String FILE_PREFIX = "FILE:"; + + /** In-memory cache: "file_id:md5" -> absolute local file path */ + private static final Map<String, String> MEM_CACHE = new ConcurrentHashMap<>(); + + /** + * Per-key locks to serialize concurrent downloads of the same file, preventing tmp file + * corruption when multiple threads race on the same file_id:md5 key. + */ + private static final Map<String, Object> DOWNLOAD_LOCKS = new ConcurrentHashMap<>(); + + private SmallFileMgr() {} + + /** + * Resolve a FILE: reference to an absolute local file path, downloading from FE if needed. FE + * address and cluster token are read from {@link Env}. + * + * @param filePath FILE reference, format: FILE:{file_id}:{md5} + * @return absolute local file path + */ + public static String getFilePath(String filePath) { + return getFilePath( + Env.getCurrentEnv().getFeMasterAddress(), + filePath, + Env.getCurrentEnv().getClusterToken(), + getLocalDir()); + } + + /** + * Get the directory of the currently running JAR file + * + * @return + */ + static String getLocalDir() { + try { + URL url = SmallFileMgr.class.getProtectionDomain().getCodeSource().getLocation(); + LOG.info("Get code source URL: {}", url); + // Spring Boot fat jar: jar:file:/path/to/app.jar!/BOOT-INF/classes!/ + if ("jar".equals(url.getProtocol())) { + String path = url.getPath(); // file:/path/to/app.jar!/BOOT-INF/classes!/ + int separator = path.indexOf("!"); + if (separator > 0) { + path = path.substring(0, separator); // file:/path/to/app.jar + } + url = new URL(path); + } + File file = new File(url.toURI()); + // When running a JAR file, `file` refers to the JAR file itself, taking its parent + // directory. + // When running an IDE file, `file` refers to the classes directory, returning directly. + return file.isFile() ? file.getParentFile().getAbsolutePath() : file.getAbsolutePath(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** Package-private overload that accepts a custom local directory, used for testing. */ + static String getFilePath( + String feMasterAddress, String filePath, String clusterToken, String localDir) { + if (!filePath.startsWith(FILE_PREFIX)) { + throw new IllegalArgumentException("filePath must start with FILE:, got: " + filePath); + } + if (feMasterAddress == null || feMasterAddress.isEmpty()) { + throw new IllegalArgumentException( + "feMasterAddress is required when filePath is a FILE: reference"); + } + String[] parts = filePath.substring(FILE_PREFIX.length()).split(":"); + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid filePath format, expected FILE:file_id:md5, got: " + filePath); + } + String fileId = parts[0]; + String md5 = parts[1]; + String cacheKey = fileId + ":" + md5; + + // 1. Fast path: in-memory cache hit — zero I/O, no lock needed + String memCached = MEM_CACHE.get(cacheKey); + if (memCached != null) { + LOG.debug("SmallFile memory cache hit: {}", memCached); + return memCached; + } + + // 2. Serialize concurrent downloads of the same file to prevent tmp file corruption + Object lock = DOWNLOAD_LOCKS.computeIfAbsent(cacheKey, k -> new Object()); + synchronized (lock) { + // Double-check memory cache inside the lock + String doubleChecked = MEM_CACHE.get(cacheKey); + if (doubleChecked != null) { + LOG.debug("SmallFile memory cache hit (after lock): {}", doubleChecked); + return doubleChecked; + } + + String finalFilePath = localDir + File.separator + fileId + "." + md5; + File finalFile = new File(finalFilePath); + + // 3. Disk cache hit — avoid downloading again after process restart + if (finalFile.exists()) { + try (FileInputStream fis = new FileInputStream(finalFile)) { + String diskMd5 = DigestUtils.md5Hex(fis); + if (diskMd5.equalsIgnoreCase(md5)) { + LOG.info("SmallFile disk cache hit: {}", finalFilePath); + MEM_CACHE.put(cacheKey, finalFilePath); + return finalFilePath; + } + LOG.warn( + "SmallFile disk cache MD5 mismatch, re-downloading: {}", finalFilePath); + } catch (IOException e) { + LOG.warn( + "Failed to read disk cached file, re-downloading: {}", + finalFilePath, + e); + } + finalFile.delete(); + } + + // 4. Download from FE: GET /api/get_small_file?file_id=xxx&token=yyy + String url = + "http://" + + feMasterAddress + + "/api/get_small_file?file_id=" + + fileId + + "&token=" + + clusterToken; + LOG.info("Downloading small file from FE: {}", url); + Review Comment: The log line prints the full download URL including the `clusterToken` query parameter. This can leak the token into logs and log aggregation systems. Consider redacting the token in logs and/or sending the token via an HTTP header instead of a query parameter (if the FE endpoint supports it). ########## fs_brokers/cdc_client/src/main/resources/application.properties: ########## @@ -17,4 +17,5 @@ ################################################################################ spring.web.resources.add-mappings=false server.port=9096 -backend.http.port=8040 \ No newline at end of file +backend.http.port=8040 +cluster.token=cluster-token Review Comment: A hard-coded default `cluster.token` in the packaged `application.properties` can lead to confusing misconfiguration (cdc_client may start successfully but fail to authenticate to FE unless overridden). Consider omitting the default and requiring it be provided via command-line/env, or at least setting it to empty and failing fast if missing. ```suggestion # cluster.token must be provided via environment/command-line or external config cluster.token= ``` ########## fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java: ########## @@ -245,6 +249,31 @@ private static synchronized int getLastSelectedBackendIndexAndUpdate() { return index; } + /** + * When enabling SSL, you need to convert FILE:ca.perm to FILE:ca.pem:md5. Review Comment: Typo in comment: `FILE:ca.perm` should be `FILE:ca.pem`. ```suggestion * When enabling SSL, you need to convert FILE:ca.pem to FILE:ca.pem:md5. ``` ########## regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy: ########## @@ -179,8 +179,9 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern { def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ log.info("jobStatus: " + jobStatus) + println("jobStatus: " + jobStatus) // check job status - jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) && jobStatus.get(0).get(1).contains("Failed to fetch meta") + jobStatus.size() == 1 && 'PAUSED' == jobStatus.get(0).get(0) } Review Comment: This assertion no longer validates the expected failure reason (`ErrorMsg` content) when the job is PAUSED. That weakens the test and could allow unrelated failures to pass. Consider keeping an assertion on `ErrorMsg` (even if relaxed, e.g. non-empty or matching the new expected message) to ensure the PAUSE is for the intended privilege error path. ########## regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy: ########## @@ -179,8 +179,9 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern { def jobStatus = sql """ select status, ErrorMsg from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ log.info("jobStatus: " + jobStatus) + println("jobStatus: " + jobStatus) Review Comment: `println("jobStatus: ...")` introduces noisy stdout output in regression runs and duplicates `log.info` above. Prefer using the existing logger only. ```suggestion ``` ########## fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java: ########## @@ -245,6 +249,31 @@ private static synchronized int getLastSelectedBackendIndexAndUpdate() { return index; } + /** + * When enabling SSL, you need to convert FILE:ca.perm to FILE:ca.pem:md5. + */ + public static Map<String, String> convertCertFile(long dbId, Map<String, String> sourceProperties) + throws JobException { + SmallFileMgr smallFileMgr = Env.getCurrentEnv().getSmallFileMgr(); + Map<String, String> newProps = new HashMap<>(sourceProperties); + if (sourceProperties.containsKey(DataSourceConfigKeys.SSL_ROOTCERT)) { + String certFile = sourceProperties.get(DataSourceConfigKeys.SSL_ROOTCERT); + if (certFile.startsWith("FILE:")) { + String file = certFile.substring(certFile.indexOf(":") + 1); + try { + SmallFile smallFile = + smallFileMgr.getSmallFile(dbId, StreamingInsertJob.JOB_FILE_CATALOG, file, true); + newProps.put(DataSourceConfigKeys.SSL_ROOTCERT, "FILE:" + smallFile.id + ":" + smallFile.md5); + } catch (DdlException ex) { + throw new JobException("ssl root cert file not found: " + certFile); + } + } else { + throw new JobException("ssl root cert is not in expected format, should start with FILE:" + certFile); Review Comment: The JobException messages here are hard to interpret and drop useful context: - The `DdlException` cause is swallowed, making debugging harder. - The "should start with FILE:" message concatenates the expected prefix and the actual value without a separator. Consider including the original exception as the cause and formatting messages as "expected ..., got ...". ```suggestion throw new JobException("ssl root cert file not found, expected existing small file reference, got '" + certFile + "'", ex); } } else { throw new JobException("ssl root cert is not in expected format, expected value starting with 'FILE:', " + "got '" + certFile + "'"); ``` ########## regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_ssl.groovy: ########## @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_postgres_job_ssl", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_name_ssl" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_normal1_ssl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // create test + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}""" + def sslInfo = sql """SHOW ssl""" + log.info("sslInfo: " + sslInfo) + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "name" varchar(200), + "age" int2, + PRIMARY KEY ("name") + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) VALUES ('A1', 1);""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) VALUES ('B1', 2);""" + } + + try { + sql """DROP FILE "ca.pem" FROM ${currentDb} PROPERTIES ("catalog" = "streaming_job")""" + } catch (Exception ignored) { + // ignore + } + + sql """CREATE FILE "ca.pem" + IN ${currentDb} + PROPERTIES + ( + "url" = "https://qa-build.oss-cn-beijing.aliyuncs.com/jianxu/root.crt", + "catalog" = "streaming_job" + ) Review Comment: This regression test depends on downloading the root CA from an external URL (`qa-build.oss-cn-beijing.aliyuncs.com`). That makes the test fragile (network, availability, region) and can break offline/air-gapped CI. Prefer referencing a test fixture stored in-repo (or provisioned by the docker-compose PG setup) so the test is self-contained. ########## docker/thirdparties/docker-compose/postgresql/certs/server.key: ########## @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC0dhjB69wyow9s +IBpCWtfuom1Fd/+P5Z8YVszj89ljpyNKyupSDAd24zITjJx729ZbKAskz9L1oEje +Oy+EB0tVoUheE5TiMy7KT7VEm+6T6JQnfBpujZm/rrTnBRTvBLQUJ8kkzEb/PPkV +i1fyQzDuwolYuEHgL6hCU76ZKPgIqODuY1mOArPTbj8FbsUqEBphgYj0Y2aMiC+N +INcY5fN3acz1cuhU0iRHyaAPr2XIcfOEZQB1cPFpOv7Tbpna4GueKntmMMgHrkn+ +qvFJzx4rVmFnNh0cGkuc/rvO5ZGLIoM0ROlFrVKhYKT3/34AbDKBFxcLuHUakxvj +rBx7j9O3AgMBAAECggEABZ+8uxdWnQYl+4xlV5E0gmTx3dh8Qd351UfFsW0demDr +lU1SI3I4I/Lelv8lyrLXZzjcwPfmezfec6RnF37p7ijSPgrIG2PLplCqJsy6BzK1 +ycH/yaYm6sIFSBqdF+ZO5QOaGOWZpA9lgsYHNVt/jdvJCq/50ZhJZO2fvfi9dr4I +vLjcCX57t+V9n68zHCdw8pTw3eSvO34wv8FXXQyofYi6+swoV/NhGFS1xMlc2USO +KQ0Do/Y8Dxr/5HawoiMTzO/o4M0Bdmb237fW4D0yVqaevjVWKe/wq2q3VZyBatB2 +XDMkL1ZaWiRsRZHoliiIh3K3gQ2jmtsMXjzv+IKdvQKBgQDgPsk7y5Ms5rjArL8g +qCP2o8a/IvxzCwxcvK59nfmWFuFeJsxE3uvp89UriqC6yGD5yxAmjDKvHOFtV+CE +KjCnMgt/jU6BpkaHzTRR8Gtt/RkILZTZiKoNdEgOTeBjHKCoOUoM7Dc78nW7Dp0F +QoLdAe0g0pSRy5iFcWBiX7UP5QKBgQDOBBRfnaU6fICVH0SmqBoKVSCDm+saYMAW +99mypm2xViP4VQOa1QjNRiEN9kllxD4I+S48kALSCpif+A/IE89bNgFNEOvTYbkW ++mvjoFLQtN79Tc8/G0CEi+WhRWWpY9WnMuzj1r/pAbC8uOEKvJ+tYfKmHZN5kvoC +k0e2yMCDawKBgFi6Hw9sxkgO5m0+LMW0Ib62IK6CHlc6uOJ8uaH0fsvXM8b4HPzn +I3tHQkJfMKeXH1/W7AYElQ1apQuJqMlClEujbo9CjxyXePLEy/3b3fYAHgZxWqMU +Aw0dxGD8iVtN+Xd2a4lfcZ9jmRexeYmaPoNJ/tRs3eIuJ6QtLxDdg5vNAoGBAIqU +C/BVZrN01Dl7Ev7XzMxufrSIyRixRAUvK20Urmy/eOqupQIdkxIhvlJZ/P1LiD8Y +/pUWeg83uXrBrjvzt2OvbCie3UMPVSWzxacUTSC+ydCx6lqUxk1inVBiEgRjd3BE +vTx1VBo0XOJVqmtCflZusH41HuKEj0/0KiU13OmJAoGAYkxy/U6uHHn6xB3KriID +bZgfYRlLv1bD4AYiOcjFke3/4MZJ2U4t/x6uzEjQZd/0waSeE3YY/MfEXufdHM99 +ZUlAHwLhjLcY58HgkyMkw4sRaHYxTQdOuxcnmzX1+sHKxKXlYoboLgh8Qf9A4DcR +HZde9n1uVLVtlBRTjjL5O84= +-----END PRIVATE KEY----- Review Comment: This file adds a PEM-encoded private key to the repository. Even if intended only for local docker testing, committing private keys is risky and often violates security policies. Prefer generating the key/cert at container startup (or during CI setup) and keeping only non-sensitive public certs in-repo. ```suggestion # Placeholder for PostgreSQL server private key. # # A real private key MUST NOT be committed to version control. # Generate a fresh key and certificate at container startup or during CI, # and write the key to this path: # docker/thirdparties/docker-compose/postgresql/certs/server.key # # Example (self-signed, for local testing only): # openssl req -newkey rsa:4096 -nodes -keyout server.key -x509 -days 365 \ # -out server.crt -subj "/CN=postgres.local" # # This file intentionally contains no valid PEM private key material. DUMMY_PRIVATE_KEY_GENERATE_AT_RUNTIME ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
