Copilot commented on code in PR #60116: URL: https://github.com/apache/doris/pull/60116#discussion_r2715144714
########## regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy: ########## @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cdc_stream_tvf_postgres", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_normal1" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // create test + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}""" + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "name" varchar(200), + "age" int2, + PRIMARY KEY ("name") + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) VALUES ('A1', 1);""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) VALUES ('B1', 2);""" + } + + test { + sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "offset" = 'initial') + """ + exception "Unsupported offset: initial" + } + + // Here, because PG consumption requires creating a slot first, + // we only verify whether the execution can be successful. + def result = sql """ + select * from cdc_stream( + "type" = "postgres", + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "table" = "${table1}", + "offset" = 'latest') + """ + log.info("result:", result) Review Comment: The test doesn't verify the actual data returned by the query. It only checks that the query executes successfully. Consider adding assertions to verify that the correct CDC data is returned, matching the expected output format and content (similar to the MySQL test which has an expected output file). ########## fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java: ########## @@ -27,7 +27,7 @@ @AllArgsConstructor @NoArgsConstructor public class JobBaseConfig { - private Long jobId; + private String jobId; Review Comment: Changing jobId from Long to String throughout the codebase is a significant API change that affects serialization, deserialization, and client compatibility. This is a breaking change that requires careful migration planning. Existing job IDs stored as Long values in databases or configurations will need conversion logic. Consider providing a migration path or backward compatibility layer. ```suggestion private Long jobId; ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java: ########## @@ -154,39 +154,17 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc if (offsetMeta == null || offsetMeta.isEmpty()) { throw new RuntimeException("miss meta offset"); } - LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta); - - // If there is an active split being consumed, reuse it directly; - // Otherwise, create a new snapshot/stream split based on offset and start the reader. - SourceSplitBase split = null; - SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); - if (currentSplitRecords == null) { - Fetcher<SourceRecords, SourceSplitBase> currentReader = this.getCurrentReader(); - if (baseReq.isReload() || currentReader == null) { - LOG.info( - "No current reader or reload {}, create new split reader for job {}", - baseReq.isReload(), - baseReq.getJobId()); - // build split - Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq); - split = splitFlag.f0; - // closeBinlogReader(); - currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); - this.setCurrentSplitRecords(currentSplitRecords); - this.setCurrentSplit(split); - } else if (currentReader instanceof IncrementalSourceStreamFetcher) { - LOG.info("Continue poll records with current binlog reader"); - // only for binlog reader - currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader); - split = this.getCurrentSplit(); - } else { - throw new RuntimeException("Should not happen"); - } - } else { - LOG.info( - "Continue read records with current split records, splitId: {}", - currentSplitRecords.getSplitId()); - } + // Create a new snapshot/stream split based on offset and start the reader. + LOG.info( + "Create new split reader for job {} with offset {}", + baseReq.getJobId(), + offsetMeta); + // build split + Tuple2<SourceSplitBase, Boolean> splitFlag = createSourceSplit(offsetMeta, baseReq); + SourceSplitBase split = splitFlag.f0; + // it's necessary to ensure that the binlog reader is already closed. + this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); + this.currentSplit = split; Review Comment: Similar to the MySQL reader, the logic for reusing active splits and continuing with the current reader has been removed. This forces recreation of splits on every call, which could impact performance. The removed logic appeared to optimize for streaming scenarios by reusing the IncrementalSourceStreamFetcher. If this optimization is no longer needed for TVF use cases, consider documenting why. ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; + private final Map<String, String> originProps; + + public CdcStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException { + this.originProps = properties; + processProps(properties); + validate(properties); + } + + private void processProps(Map<String, String> properties) throws AnalysisException { + Map<String, String> copyProps = new HashMap<>(properties); + copyProps.put("format", "json"); + super.parseCommonProperties(copyProps); + this.processedParams.put("enable_cdc_client", "true"); + this.processedParams.put("uri", URI); + this.processedParams.put("http.enable.range.request", "false"); + this.processedParams.put("http.chunk.response", "true"); + this.processedParams.put("http.method", "POST"); + + String payload = generateParams(properties); + this.processedParams.put("http.payload", payload); + this.backendConnectProperties.putAll(processedParams); + generateFileStatus(); + } + + private String generateParams(Map<String, String> properties) throws AnalysisException { + FetchRecordRequest recordRequest = new FetchRecordRequest(); + recordRequest.setJobId(UUID.randomUUID().toString().replace("-", "")); + recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); + recordRequest.setConfig(properties); + try { + return objectMapper.writeValueAsString(recordRequest); + } catch (IOException e) { + LOG.info("Failed to serialize fetch record request," + e.getMessage()); + throw new AnalysisException(e.getMessage()); + } + } + + private void validate(Map<String, String> properties) { + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL), "jdbc_url is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE), "type is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), "table is required"); Review Comment: The validation doesn't check for the OFFSET parameter, which is required based on the generateMeta logic in PipelineCoordinator. Without an offset, the TVF will fail at runtime. Add validation to ensure the offset parameter is present. ```suggestion Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), "table is required"); Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.OFFSET), "offset is required"); ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java: ########## @@ -167,29 +166,18 @@ private PostgresSourceConfig generatePostgresConfig(Map<String, String> cdcConfi configFactory.includeSchemaChanges(false); // Set table list - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - if (StringUtils.isNotEmpty(includingTables)) { - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> schema + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); - } + String[] tableList = ConfigUtil.getTableList(schema, cdcConfig); + Preconditions.checkArgument(tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // Set startup options String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.initial()); - } else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) { - configFactory.startupOptions(StartupOptions.earliest()); } else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.latest()); } else if (ConfigUtil.isJson(startupMode)) { throw new RuntimeException("Unsupported json offset " + startupMode); - } else if (ConfigUtil.is13Timestamp(startupMode)) { - // start from timestamp - Long ts = Long.parseLong(startupMode); - configFactory.startupOptions(StartupOptions.timestamp(ts)); } else { throw new RuntimeException("Unknown offset " + startupMode); Review Comment: The startup mode validation logic has removed support for "earliest" and timestamp-based offsets that were previously supported. The code now throws "Unsupported json offset" for JSON offsets and "Unknown offset" for timestamp offsets, but the removed branches suggest these were intentional features. If these offset modes are no longer needed, consider documenting why in the commit message or code comments. If they should still be supported, this is a breaking change that removes functionality. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java: ########## @@ -568,18 +543,16 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, String> cdcConfig, Str configFactory.includeSchemaChanges(false); - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> databaseName + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); + // Set table list + String[] tableList = ConfigUtil.getTableList(databaseName, cdcConfig); + com.google.common.base.Preconditions.checkArgument( + tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // setting startMode String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) { - // do not need set offset when initial - // configFactory.startupOptions(StartupOptions.initial()); + configFactory.startupOptions(StartupOptions.initial()); Review Comment: The commented-out line for setting StartupOptions.initial() has been uncommented, but the comment above it "do not need set offset when initial" has been removed. This suggests there may have been a reason not to set this option initially. If this behavior change is intentional and correct, the change is fine; otherwise, this could introduce a bug if the "initial" mode was intentionally not being set. ```suggestion // Do not explicitly set startupOptions for INITIAL mode; rely on the default // behavior of MySqlSourceConfigFactory to start from the initial snapshot/binlog. ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java: ########## @@ -82,6 +83,17 @@ public Object fetchRecords(@RequestBody FetchRecordRequest recordReq) { } } + @RequestMapping(path = "/api/fetchRecordStream", method = RequestMethod.POST) + public StreamingResponseBody fetchRecordStream(@RequestBody FetchRecordRequest recordReq) + throws Exception { + return pipelineCoordinator.fetchRecordStream(recordReq); + } + + @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = RequestMethod.POST) Review Comment: The getTaskIdOffset endpoint uses POST method but doesn't require any body in the request - only the taskId from the path. This should be a GET request instead, as it's a read-only operation retrieving data. Using POST for read operations violates RESTful API design principles and could cause caching and semantic issues. ```suggestion @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = RequestMethod.GET) ``` ########## regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy: ########## @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cdc_stream_tvf_mysql", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_cdc_stream_tvf" + def mysqlDb = "test_cdc_db" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + def offset = "" + + // create test + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { Review Comment: The test uses hardcoded credentials ("root", "123456") directly in the code. While this is acceptable for test code, consider using configuration variables or test fixtures for better maintainability and to avoid accidental exposure if these tests are run against non-test environments. ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; Review Comment: The URI field is defined as non-final but is never reassigned after initialization. It should be declared as a constant: private static final String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; ```suggestion private static final String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; ``` ########## fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java: ########## @@ -19,13 +19,15 @@ public class DataSourceConfigKeys { public static final String JDBC_URL = "jdbc_url"; + public static final String TYPE = "type"; public static final String DRIVER_URL = "driver_url"; public static final String DRIVER_CLASS = "driver_class"; public static final String USER = "user"; public static final String PASSWORD = "password"; public static final String DATABASE = "database"; public static final String SCHEMA = "schema"; public static final String INCLUDE_TABLES = "include_tables"; Review Comment: The new constant TABLE is added without documentation. Consider adding a JavaDoc comment explaining the difference between TABLE and INCLUDE_TABLES (e.g., "Single table name for CDC streaming. Use INCLUDE_TABLES for multiple tables.") to clarify usage. ```suggestion public static final String INCLUDE_TABLES = "include_tables"; /** * Single table name for CDC streaming. Use INCLUDE_TABLES for multiple tables. */ ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java: ########## @@ -170,39 +169,14 @@ public SplitReadResult readSplitRecords(JobBaseRecordRequest baseReq) throws Exc if (offsetMeta == null || offsetMeta.isEmpty()) { throw new RuntimeException("miss meta offset"); } - LOG.info("Job {} read split records with offset: {}", baseReq.getJobId(), offsetMeta); - - // If there is an active split being consumed, reuse it directly; - // Otherwise, create a new snapshot/binlog split based on offset and start the reader. - MySqlSplit split = null; - SplitRecords currentSplitRecords = this.getCurrentSplitRecords(); - if (currentSplitRecords == null) { - DebeziumReader<SourceRecords, MySqlSplit> currentReader = this.getCurrentReader(); - if (baseReq.isReload() || currentReader == null) { - LOG.info( - "No current reader or reload {}, create new split reader", - baseReq.isReload()); - // build split - Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, baseReq); - split = splitFlag.f0; - // reset binlog reader - // closeBinlogReader(); - currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); - this.setCurrentSplitRecords(currentSplitRecords); - this.setCurrentSplit(split); - } else if (currentReader instanceof BinlogSplitReader) { - LOG.info("Continue poll records with current binlog reader"); - // only for binlog reader - currentSplitRecords = pollSplitRecordsWithCurrentReader(currentReader); - split = this.getCurrentSplit(); - } else { - throw new RuntimeException("Should not happen"); - } - } else { - LOG.info( - "Continue read records with current split records, splitId: {}", - currentSplitRecords.getSplitId()); - } + // Create a new snapshot/binlog split based on offset and start the reader. + LOG.info("create new split reader for {} with offset {}", baseReq.getJobId(), offsetMeta); + // build split + Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, baseReq); + MySqlSplit split = splitFlag.f0; + // it's necessary to ensure that the binlog reader is already closed. + this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq); + this.currentSplit = split; Review Comment: The logic that handles reusing an active split and continuing with the current reader has been removed (lines 173-206 in the original). This appears to be a significant behavioral change that forces a new split reader to be created on every call. This could impact performance and resource usage, especially for continuous streaming scenarios. If this change is intentional, it should be documented why the optimization for reusing readers was removed. ########## be/src/io/fs/http_file_reader.cpp: ########## @@ -95,34 +107,68 @@ Status HttpFileReader::open(const FileReaderOptions& opts) { return Status::OK(); } - // Step 1: HEAD request to get file metadata - RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true)); - _client->set_method(HttpMethod::HEAD); - RETURN_IF_ERROR(_client->execute()); + // start CDC client + auto enable_cdc_iter = _extend_kv.find("enable_cdc_client"); + if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second == "true") { + LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url; + ExecEnv* env = ExecEnv::GetInstance(); + if (env == nullptr || env->cdc_client_mgr() == nullptr) { + return Status::InternalError("ExecEnv or CdcClientMgr is not initialized"); + } + + PRequestCdcClientResult result; + Status start_st = env->cdc_client_mgr()->start_cdc_client(&result); + if (!start_st.ok()) { + LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string(); + return start_st; + } + + _url = fmt::format(_url, doris::config::cdc_client_port); + _range_supported = false; + LOG(INFO) << "CDC client started successfully for " << _url; + } Review Comment: The CDC client is started during the file reader open operation, but there's no corresponding cleanup in the close() or destructor. If the CDC client has resources that need cleanup (connections, threads, etc.), they may leak. Consider adding cleanup logic in the close() method or ensuring the CDC client manager handles resource cleanup properly. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java: ########## @@ -81,19 +95,153 @@ public PipelineCoordinator() { new ThreadPoolExecutor.AbortPolicy()); } + /** return data for http_file_reader */ + public StreamingResponseBody fetchRecordStream(FetchRecordRequest fetchReq) throws Exception { + SourceReader sourceReader; + SplitReadResult readResult; + try { + if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { + LOG.info( + "Generate initial meta for fetch record request, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId()); + // means the request did not originate from the job, only tvf + Map<String, Object> meta = generateMeta(fetchReq.getConfig()); + fetchReq.setMeta(meta); + } + + sourceReader = Env.getCurrentEnv().getReader(fetchReq); + readResult = sourceReader.readSplitRecords(fetchReq); + } catch (Exception ex) { + throw new CommonException(ex); + } + + return outputStream -> { + try { + buildRecords(sourceReader, fetchReq, readResult, outputStream); + } catch (Exception ex) { + LOG.error( + "Failed fetch record, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId(), + ex); + throw new StreamException(ex); + } + }; + } + + private void buildRecords( + SourceReader sourceReader, + FetchRecordRequest fetchRecord, + SplitReadResult readResult, + OutputStream rawOutputStream) + throws Exception { + SourceSplit split = readResult.getSplit(); + Map<String, String> lastMeta = null; + int rowCount = 0; + BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream); + try { + // Serialize records and add them to the response (collect from iterator) + Iterator<SourceRecord> iterator = readResult.getRecordIterator(); + while (iterator != null && iterator.hasNext()) { + SourceRecord element = iterator.next(); + List<String> serializedRecords = + sourceReader.deserialize(fetchRecord.getConfig(), element); + for (String record : serializedRecords) { + bos.write(record.getBytes(StandardCharsets.UTF_8)); + bos.write(LINE_DELIMITER); + } + rowCount += serializedRecords.size(); + } + // force flush buffer + bos.flush(); + } finally { + // The LSN in the commit is the current offset, which is the offset from the last + // successful write. + // Therefore, even if a subsequent write fails, it will not affect the commit. + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); + + // This must be called after commitSourceOffset; otherwise, + // PG's confirmed lsn will not proceed. + sourceReader.finishSplitRecords(); + } + + LOG.info( + "Fetch records completed, jobId={}, taskId={}, splitId={}, rowCount={}", + fetchRecord.getJobId(), + fetchRecord.getTaskId(), + split.splitId(), + rowCount); + + if (readResult.getSplitState() != null) { + // Set meta information for hw + if (sourceReader.isSnapshotSplit(split)) { + lastMeta = sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, split.splitId()); + } + + // set meta for binlog event + if (sourceReader.isBinlogSplit(split)) { + lastMeta = sourceReader.extractBinlogStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } + } else { + throw new RuntimeException("split state is null"); + } + + if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) { + taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta); + } + + if (!isLong(fetchRecord.getJobId())) { + // TVF requires closing the window after each execution, + // while PG requires dropping the slot. + sourceReader.close(fetchRecord); + } + } + + private boolean isLong(String s) { + if (s == null || s.isEmpty()) return false; + try { + Long.parseLong(s); + return true; + } catch (NumberFormatException e) { + return false; + } + } + + /** Generate split meta from request.offset */ + private Map<String, Object> generateMeta(Map<String, String> cdcConfig) + throws JsonProcessingException { + Map<String, Object> meta = new HashMap<>(); + String offset = cdcConfig.get(DataSourceConfigKeys.OFFSET); + if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) { + meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } else if (ConfigUtil.isJson(offset)) { + Map<String, String> startOffset = + objectMapper.readValue(offset, new TypeReference<>() {}); + meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + meta.put("startingOffset", startOffset); + } else { + throw new RuntimeException("Unsupported offset: " + offset); Review Comment: The generateMeta method only supports 'latest', 'earliest', and JSON offsets for TVF, but 'initial' is a valid startup mode according to the source reader implementations (MySQL and PostgreSQL both have code to handle it). This creates an inconsistency where 'initial' mode is supported in the source readers but explicitly rejected for TVF usage. Consider either supporting 'initial' in TVF or documenting why it's not supported. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java: ########## @@ -42,26 +50,32 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import io.debezium.data.Envelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody; /** Pipeline coordinator. */ @Component public class PipelineCoordinator { private static final Logger LOG = LoggerFactory.getLogger(PipelineCoordinator.class); private static final String SPLIT_ID = "splitId"; // jobId - private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new ConcurrentHashMap<>(); + private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new ConcurrentHashMap<>(); // taskId -> writeFailReason private final Map<String, String> taskErrorMaps = new ConcurrentHashMap<>(); + // taskId, offset Review Comment: The taskOffsetCache Map is accessed without thread-safety considerations. While ConcurrentHashMap provides thread-safe operations for individual methods, the pattern of checking and retrieving data in getOffsetWithTaskId could still have race conditions if another thread removes the entry between check and retrieval in calling code. Consider documenting the thread-safety expectations or adding synchronization if needed. ```suggestion /** * Cache of task offsets keyed by taskId. * * <p>Thread-safety: this map is a {@link ConcurrentHashMap} and is accessed by multiple * threads concurrently. Entries may be added and removed at any time by background tasks. * Callers must treat the contents as a best-effort snapshot only and must be prepared for * {@code null} or missing entries even if a previous check suggested that an entry existed. * * <p>In particular, callers of helper methods such as {@code getOffsetWithTaskId} must not * assume that a "check then use" pattern on this cache is atomic. Any such logic must handle * the possibility that another thread has removed or updated the entry between operations. */ ``` ########## be/src/io/fs/http_file_reader.cpp: ########## @@ -269,6 +335,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r long http_status = _client->get_http_status(); VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size(); + // Check for HTTP error status codes (4xx, 5xx) + if (http_status >= 400) { + std::string error_body; + if (buf.empty()) { + error_body = "(empty response body)"; + } else { + // Limit error message to 1024 bytes to avoid excessive logging + size_t max_len = std::min(buf.size(), static_cast<size_t>(1024)); + error_body = buf.substr(0, max_len); Review Comment: The error handling for HTTP status codes >= 400 limits the error body to 1024 bytes. While this is reasonable for logging, the comment says "avoid excessive logging" but doesn't handle cases where the error message might be truncated in the middle of a multi-byte UTF-8 character. Consider using a character-aware truncation or noting that the message might be incomplete. ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java: ########## @@ -35,4 +39,17 @@ public Object exceptionHandler(HttpServletRequest request, Exception e) { log.error("Unexpected exception", e); return RestResponse.internalError(e.getMessage()); } + + @ExceptionHandler(StreamException.class) + public Object streamExceptionHandler(StreamException e) { + // Directly throwing an exception allows curl to detect anomalies in the streaming response. + log.error("Exception in streaming response, re-throwing to client", e); + throw e; + } + Review Comment: The StreamException handler re-throws the exception after logging. This is appropriate for streaming responses where you want the client to detect issues. However, the logging should include the exception details that were already logged by the lambda (lines 123-128 in PipelineCoordinator). This creates duplicate logging. Consider whether one of these log statements could be removed or consolidated. ```suggestion throw e; } ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java: ########## @@ -81,19 +95,153 @@ public PipelineCoordinator() { new ThreadPoolExecutor.AbortPolicy()); } + /** return data for http_file_reader */ + public StreamingResponseBody fetchRecordStream(FetchRecordRequest fetchReq) throws Exception { + SourceReader sourceReader; + SplitReadResult readResult; + try { + if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { + LOG.info( + "Generate initial meta for fetch record request, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId()); + // means the request did not originate from the job, only tvf + Map<String, Object> meta = generateMeta(fetchReq.getConfig()); + fetchReq.setMeta(meta); + } + + sourceReader = Env.getCurrentEnv().getReader(fetchReq); + readResult = sourceReader.readSplitRecords(fetchReq); + } catch (Exception ex) { + throw new CommonException(ex); + } + + return outputStream -> { + try { + buildRecords(sourceReader, fetchReq, readResult, outputStream); + } catch (Exception ex) { + LOG.error( + "Failed fetch record, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId(), + ex); + throw new StreamException(ex); + } + }; + } + + private void buildRecords( + SourceReader sourceReader, + FetchRecordRequest fetchRecord, + SplitReadResult readResult, + OutputStream rawOutputStream) + throws Exception { + SourceSplit split = readResult.getSplit(); + Map<String, String> lastMeta = null; + int rowCount = 0; + BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream); + try { + // Serialize records and add them to the response (collect from iterator) + Iterator<SourceRecord> iterator = readResult.getRecordIterator(); + while (iterator != null && iterator.hasNext()) { + SourceRecord element = iterator.next(); + List<String> serializedRecords = + sourceReader.deserialize(fetchRecord.getConfig(), element); + for (String record : serializedRecords) { + bos.write(record.getBytes(StandardCharsets.UTF_8)); + bos.write(LINE_DELIMITER); + } + rowCount += serializedRecords.size(); + } + // force flush buffer + bos.flush(); + } finally { + // The LSN in the commit is the current offset, which is the offset from the last + // successful write. + // Therefore, even if a subsequent write fails, it will not affect the commit. + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); + + // This must be called after commitSourceOffset; otherwise, + // PG's confirmed lsn will not proceed. + sourceReader.finishSplitRecords(); + } + + LOG.info( + "Fetch records completed, jobId={}, taskId={}, splitId={}, rowCount={}", + fetchRecord.getJobId(), + fetchRecord.getTaskId(), + split.splitId(), + rowCount); + + if (readResult.getSplitState() != null) { + // Set meta information for hw + if (sourceReader.isSnapshotSplit(split)) { + lastMeta = sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, split.splitId()); + } + + // set meta for binlog event + if (sourceReader.isBinlogSplit(split)) { + lastMeta = sourceReader.extractBinlogStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } + } else { + throw new RuntimeException("split state is null"); + } + + if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) { + taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta); + } + + if (!isLong(fetchRecord.getJobId())) { + // TVF requires closing the window after each execution, + // while PG requires dropping the slot. + sourceReader.close(fetchRecord); + } + } + + private boolean isLong(String s) { + if (s == null || s.isEmpty()) return false; + try { + Long.parseLong(s); + return true; + } catch (NumberFormatException e) { + return false; + } Review Comment: The isLong method uses a try-catch for control flow to determine if a string is parseable as a Long. While this works, it's inefficient and considered an anti-pattern. Consider using a regex pattern or manual character validation instead. Example: return s.matches("^-?\\d+$"); ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java: ########## @@ -96,6 +98,21 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro return ZoneId.systemDefault(); } + public static String[] getTableList(String schema, Map<String, String> cdcConfig) { + String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); + String table = cdcConfig.get(DataSourceConfigKeys.TABLE); + if (StringUtils.isNotEmpty(includingTables)) { + return Arrays.stream(includingTables.split(",")) + .map(t -> schema + "." + t.trim()) + .toArray(String[]::new); + } else if (StringUtils.isNotEmpty(table)) { + Preconditions.checkArgument(!table.contains(","), "table only supports one table"); + return new String[] {schema + "." + table.trim()}; + } else { + return new String[0]; + } + } Review Comment: The getTableList method doesn't validate that the schema parameter is not null or empty before using it in string concatenation. If schema is null or empty, this will create malformed table names like ".tablename". Add validation to ensure schema is not null/empty, or document that callers must validate this. ########## fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java: ########## @@ -19,13 +19,15 @@ public class DataSourceConfigKeys { public static final String JDBC_URL = "jdbc_url"; Review Comment: The new constant TYPE is added without documentation. Consider adding a JavaDoc comment explaining what this configuration key represents (e.g., "Data source type: 'mysql' or 'postgres'") to help users understand how to use it. ```suggestion public static final String JDBC_URL = "jdbc_url"; /** * Data source type, e.g. "mysql" or "postgres". */ ``` ########## be/src/io/fs/http_file_reader.cpp: ########## @@ -95,34 +107,68 @@ Status HttpFileReader::open(const FileReaderOptions& opts) { return Status::OK(); } - // Step 1: HEAD request to get file metadata - RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true)); - _client->set_method(HttpMethod::HEAD); - RETURN_IF_ERROR(_client->execute()); + // start CDC client + auto enable_cdc_iter = _extend_kv.find("enable_cdc_client"); + if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second == "true") { + LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url; + ExecEnv* env = ExecEnv::GetInstance(); + if (env == nullptr || env->cdc_client_mgr() == nullptr) { + return Status::InternalError("ExecEnv or CdcClientMgr is not initialized"); + } + + PRequestCdcClientResult result; + Status start_st = env->cdc_client_mgr()->start_cdc_client(&result); + if (!start_st.ok()) { + LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string(); + return start_st; + } + + _url = fmt::format(_url, doris::config::cdc_client_port); Review Comment: The URI template placeholder "{}" expects to be formatted with the CDC client port, but there's no validation that the format operation succeeds or that the port value is valid before the HTTP request is made. If config::cdc_client_port is not set correctly, this could result in an invalid URL. Consider adding validation or error handling for the URL formatting. ########## fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java: ########## @@ -28,8 +28,4 @@ @EqualsAndHashCode(callSuper = true) public abstract class JobBaseRecordRequest extends JobBaseConfig { protected Map<String, Object> meta; Review Comment: The removal of isReload() and getFetchSize() abstract methods from JobBaseRecordRequest and their implementations in child classes appears to be a breaking change. If these methods were part of a public API or used by other components, this could cause compilation errors. Verify that no other code depends on these methods. ```suggestion protected Map<String, Object> meta; /** * Whether this request is a reload. * <p> * Default implementation reads the "reload" flag from {@link #meta} if present, * otherwise returns {@code false}. */ public boolean isReload() { if (meta == null) { return false; } Object value = meta.get("reload"); if (value instanceof Boolean) { return (Boolean) value; } return false; } /** * Fetch size for this request. * <p> * Default implementation reads the "fetchSize" value from {@link #meta} if present, * otherwise returns {@code 0}. */ public int getFetchSize() { if (meta == null) { return 0; } Object value = meta.get("fetchSize"); if (value instanceof Number) { return ((Number) value).intValue(); } return 0; } ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java: ########## @@ -167,29 +166,18 @@ private PostgresSourceConfig generatePostgresConfig(Map<String, String> cdcConfi configFactory.includeSchemaChanges(false); // Set table list - String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); - if (StringUtils.isNotEmpty(includingTables)) { - String[] includingTbls = - Arrays.stream(includingTables.split(",")) - .map(t -> schema + "." + t.trim()) - .toArray(String[]::new); - configFactory.tableList(includingTbls); - } + String[] tableList = ConfigUtil.getTableList(schema, cdcConfig); + Preconditions.checkArgument(tableList.length >= 1, "include_tables or table is required"); + configFactory.tableList(tableList); // Set startup options String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET); if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.initial()); - } else if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) { - configFactory.startupOptions(StartupOptions.earliest()); } else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.latest()); } else if (ConfigUtil.isJson(startupMode)) { throw new RuntimeException("Unsupported json offset " + startupMode); Review Comment: The error message "Unsupported json offset" is misleading because it suggests JSON offsets are not supported, but the actual issue is that JSON offset parsing for PostgreSQL is not implemented. Consider changing to a more descriptive message like "JSON offset format is not yet supported for PostgreSQL" or "Specific LSN offsets are not supported, use 'initial' or 'latest'". ```suggestion throw new RuntimeException( "JSON offset format is not yet supported for PostgreSQL, use 'initial' or " + "'latest': " + startupMode); ``` ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; + private final Map<String, String> originProps; + + public CdcStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException { + this.originProps = properties; + processProps(properties); + validate(properties); + } + + private void processProps(Map<String, String> properties) throws AnalysisException { + Map<String, String> copyProps = new HashMap<>(properties); + copyProps.put("format", "json"); + super.parseCommonProperties(copyProps); + this.processedParams.put("enable_cdc_client", "true"); + this.processedParams.put("uri", URI); + this.processedParams.put("http.enable.range.request", "false"); + this.processedParams.put("http.chunk.response", "true"); + this.processedParams.put("http.method", "POST"); + + String payload = generateParams(properties); + this.processedParams.put("http.payload", payload); + this.backendConnectProperties.putAll(processedParams); + generateFileStatus(); + } + + private String generateParams(Map<String, String> properties) throws AnalysisException { + FetchRecordRequest recordRequest = new FetchRecordRequest(); + recordRequest.setJobId(UUID.randomUUID().toString().replace("-", "")); + recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); + recordRequest.setConfig(properties); + try { + return objectMapper.writeValueAsString(recordRequest); + } catch (IOException e) { + LOG.info("Failed to serialize fetch record request," + e.getMessage()); Review Comment: The error logging at line 77 concatenates strings directly instead of using parameterized logging. This is less efficient and doesn't follow logging best practices. Change to: LOG.info("Failed to serialize fetch record request: {}", e.getMessage()); ```suggestion LOG.info("Failed to serialize fetch record request: {}", e.getMessage()); ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java: ########## @@ -96,6 +98,21 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro return ZoneId.systemDefault(); } + public static String[] getTableList(String schema, Map<String, String> cdcConfig) { + String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); + String table = cdcConfig.get(DataSourceConfigKeys.TABLE); + if (StringUtils.isNotEmpty(includingTables)) { + return Arrays.stream(includingTables.split(",")) + .map(t -> schema + "." + t.trim()) + .toArray(String[]::new); + } else if (StringUtils.isNotEmpty(table)) { + Preconditions.checkArgument(!table.contains(","), "table only supports one table"); Review Comment: The table parameter check at line 109 uses Preconditions.checkArgument to ensure it doesn't contain commas, but this error won't be very helpful to users. Consider a more descriptive error message like "table parameter should contain a single table name, use include_tables for multiple tables" to guide users toward the correct parameter. ```suggestion Preconditions.checkArgument( !table.contains(","), "table parameter should contain a single table name, use include_tables for multiple tables"); ``` ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.cdc.request.FetchRecordRequest; +import org.apache.doris.job.common.DataSourceType; +import org.apache.doris.job.util.StreamingJobUtils; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final ObjectMapper objectMapper = new ObjectMapper(); + private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream"; + private final Map<String, String> originProps; + + public CdcStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException { + this.originProps = properties; + processProps(properties); + validate(properties); + } + + private void processProps(Map<String, String> properties) throws AnalysisException { + Map<String, String> copyProps = new HashMap<>(properties); + copyProps.put("format", "json"); + super.parseCommonProperties(copyProps); + this.processedParams.put("enable_cdc_client", "true"); + this.processedParams.put("uri", URI); + this.processedParams.put("http.enable.range.request", "false"); + this.processedParams.put("http.chunk.response", "true"); + this.processedParams.put("http.method", "POST"); + + String payload = generateParams(properties); + this.processedParams.put("http.payload", payload); + this.backendConnectProperties.putAll(processedParams); + generateFileStatus(); + } + + private String generateParams(Map<String, String> properties) throws AnalysisException { + FetchRecordRequest recordRequest = new FetchRecordRequest(); + recordRequest.setJobId(UUID.randomUUID().toString().replace("-", "")); + recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE)); + recordRequest.setConfig(properties); + try { + return objectMapper.writeValueAsString(recordRequest); + } catch (IOException e) { + LOG.info("Failed to serialize fetch record request," + e.getMessage()); + throw new AnalysisException(e.getMessage()); + } + } + + private void validate(Map<String, String> properties) { + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL), "jdbc_url is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE), "type is required"); + Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), "table is required"); + } + + private void generateFileStatus() { + this.fileStatuses.clear(); + this.fileStatuses.add(new TBrokerFileStatus(URI, false, Integer.MAX_VALUE, false)); + } + + @Override + public List<Column> getTableColumns() throws AnalysisException { + DataSourceType dataSourceType = + DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase()); Review Comment: The getTableColumns method uses valueOf which will throw IllegalArgumentException for invalid data source types, but doesn't provide a user-friendly error message. Consider adding a try-catch to provide a more helpful error message listing the supported types (e.g., "Unsupported data source type: X. Supported types are: MYSQL, POSTGRES"). ```suggestion String typeStr = processedParams.get(DataSourceConfigKeys.TYPE); DataSourceType dataSourceType; try { dataSourceType = DataSourceType.valueOf(typeStr.toUpperCase()); } catch (IllegalArgumentException e) { DataSourceType[] supportedTypes = DataSourceType.values(); StringBuilder supported = new StringBuilder(); for (int i = 0; i < supportedTypes.length; i++) { if (i > 0) { supported.append(", "); } supported.append(supportedTypes[i].name()); } throw new AnalysisException("Unsupported data source type: " + typeStr + ". Supported types are: " + supported); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
