Copilot commented on code in PR #60116:
URL: https://github.com/apache/doris/pull/60116#discussion_r2715144714


##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy:
##########
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cdc_stream_tvf_postgres", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_pg_normal1"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // create test
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "name" varchar(200),
+                  "age" int2,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('A1', 1);"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('B1', 2);"""
+        }
+
+        test {
+            sql """
+            select * from cdc_stream(
+                "type" = "postgres",
+                 "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                 "driver_url" = "${driver_url}",
+                 "driver_class" = "org.postgresql.Driver",
+                 "user" = "${pgUser}",
+                 "password" = "${pgPassword}",
+                 "database" = "${pgDB}",
+                 "schema" = "${pgSchema}",
+                "table" = "${table1}",
+                "offset" = 'initial')
+            """
+            exception "Unsupported offset: initial"
+        }
+
+        // Here, because PG consumption requires creating a slot first,
+        // we only verify whether the execution can be successful.
+        def result = sql """
+            select * from cdc_stream(
+                "type" = "postgres",
+                "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user" = "${pgUser}",
+                "password" = "${pgPassword}",
+                "database" = "${pgDB}",
+                "schema" = "${pgSchema}",
+                "table" = "${table1}",
+                "offset" = 'latest')
+            """
+        log.info("result:", result)

Review Comment:
   The test doesn't verify the actual data returned by the query. It only 
checks that the query executes successfully. Consider adding assertions to 
verify that the correct CDC data is returned, matching the expected output 
format and content (similar to the MySQL test which has an expected output 
file).



##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java:
##########
@@ -27,7 +27,7 @@
 @AllArgsConstructor
 @NoArgsConstructor
 public class JobBaseConfig {
-    private Long jobId;
+    private String jobId;

Review Comment:
   Changing jobId from Long to String throughout the codebase is a significant 
API change that affects serialization, deserialization, and client 
compatibility. This is a breaking change that requires careful migration 
planning. Existing job IDs stored as Long values in databases or configurations 
will need conversion logic. Consider providing a migration path or backward 
compatibility layer.
   ```suggestion
       private Long jobId;
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java:
##########
@@ -154,39 +154,17 @@ public SplitReadResult 
readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
-        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/stream split based on offset and 
start the reader.
-        SourceSplitBase split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            Fetcher<SourceRecords, SourceSplitBase> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader for job {}",
-                        baseReq.isReload(),
-                        baseReq.getJobId());
-                // build split
-                Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof 
IncrementalSourceStreamFetcher) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
-        } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
-        }
+        // Create a new snapshot/stream split based on offset and start the 
reader.
+        LOG.info(
+                "Create new split reader for job {} with offset {}",
+                baseReq.getJobId(),
+                offsetMeta);
+        // build split
+        Tuple2<SourceSplitBase, Boolean> splitFlag = 
createSourceSplit(offsetMeta, baseReq);
+        SourceSplitBase split = splitFlag.f0;
+        // it's necessary to ensure that the binlog reader is already closed.
+        this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq);
+        this.currentSplit = split;

Review Comment:
   Similar to the MySQL reader, the logic for reusing active splits and 
continuing with the current reader has been removed. This forces recreation of 
splits on every call, which could impact performance. The removed logic 
appeared to optimize for streaming scenarios by reusing the 
IncrementalSourceStreamFetcher. If this optimization is no longer needed for 
TVF use cases, consider documenting why.



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream";;
+    private final Map<String, String> originProps;
+
+    public CdcStreamTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        this.originProps = properties;
+        processProps(properties);
+        validate(properties);
+    }
+
+    private void processProps(Map<String, String> properties) throws 
AnalysisException {
+        Map<String, String> copyProps = new HashMap<>(properties);
+        copyProps.put("format", "json");
+        super.parseCommonProperties(copyProps);
+        this.processedParams.put("enable_cdc_client", "true");
+        this.processedParams.put("uri", URI);
+        this.processedParams.put("http.enable.range.request", "false");
+        this.processedParams.put("http.chunk.response", "true");
+        this.processedParams.put("http.method", "POST");
+
+        String payload = generateParams(properties);
+        this.processedParams.put("http.payload", payload);
+        this.backendConnectProperties.putAll(processedParams);
+        generateFileStatus();
+    }
+
+    private String generateParams(Map<String, String> properties) throws 
AnalysisException {
+        FetchRecordRequest recordRequest = new FetchRecordRequest();
+        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+        recordRequest.setConfig(properties);
+        try {
+            return objectMapper.writeValueAsString(recordRequest);
+        } catch (IOException e) {
+            LOG.info("Failed to serialize fetch record request," + 
e.getMessage());
+            throw new AnalysisException(e.getMessage());
+        }
+    }
+
+    private void validate(Map<String, String> properties) {
+        
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL),
 "jdbc_url is required");
+        
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE), 
"type is required");
+        
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), 
"table is required");

Review Comment:
   The validation doesn't check for the OFFSET parameter, which is required 
based on the generateMeta logic in PipelineCoordinator. Without an offset, the 
TVF will fail at runtime. Add validation to ensure the offset parameter is 
present.
   ```suggestion
           
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), 
"table is required");
           
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.OFFSET),
 "offset is required");
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -167,29 +166,18 @@ private PostgresSourceConfig 
generatePostgresConfig(Map<String, String> cdcConfi
         configFactory.includeSchemaChanges(false);
 
         // Set table list
-        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
-        if (StringUtils.isNotEmpty(includingTables)) {
-            String[] includingTbls =
-                    Arrays.stream(includingTables.split(","))
-                            .map(t -> schema + "." + t.trim())
-                            .toArray(String[]::new);
-            configFactory.tableList(includingTbls);
-        }
+        String[] tableList = ConfigUtil.getTableList(schema, cdcConfig);
+        Preconditions.checkArgument(tableList.length >= 1, "include_tables or 
table is required");
+        configFactory.tableList(tableList);
 
         // Set startup options
         String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
         if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
             configFactory.startupOptions(StartupOptions.initial());
-        } else if 
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
-            configFactory.startupOptions(StartupOptions.earliest());
         } else if 
(DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
             configFactory.startupOptions(StartupOptions.latest());
         } else if (ConfigUtil.isJson(startupMode)) {
             throw new RuntimeException("Unsupported json offset " + 
startupMode);
-        } else if (ConfigUtil.is13Timestamp(startupMode)) {
-            // start from timestamp
-            Long ts = Long.parseLong(startupMode);
-            configFactory.startupOptions(StartupOptions.timestamp(ts));
         } else {
             throw new RuntimeException("Unknown offset " + startupMode);

Review Comment:
   The startup mode validation logic has removed support for "earliest" and 
timestamp-based offsets that were previously supported. The code now throws 
"Unsupported json offset" for JSON offsets and "Unknown offset" for timestamp 
offsets, but the removed branches suggest these were intentional features. If 
these offset modes are no longer needed, consider documenting why in the commit 
message or code comments. If they should still be supported, this is a breaking 
change that removes functionality.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -568,18 +543,16 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, 
String> cdcConfig, Str
 
         configFactory.includeSchemaChanges(false);
 
-        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
-        String[] includingTbls =
-                Arrays.stream(includingTables.split(","))
-                        .map(t -> databaseName + "." + t.trim())
-                        .toArray(String[]::new);
-        configFactory.tableList(includingTbls);
+        // Set table list
+        String[] tableList = ConfigUtil.getTableList(databaseName, cdcConfig);
+        com.google.common.base.Preconditions.checkArgument(
+                tableList.length >= 1, "include_tables or table is required");
+        configFactory.tableList(tableList);
 
         // setting startMode
         String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
         if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
-            // do not need set offset when initial
-            // configFactory.startupOptions(StartupOptions.initial());
+            configFactory.startupOptions(StartupOptions.initial());

Review Comment:
   The commented-out line for setting StartupOptions.initial() has been 
uncommented, but the comment above it "do not need set offset when initial" has 
been removed. This suggests there may have been a reason not to set this option 
initially. If this behavior change is intentional and correct, the change is 
fine; otherwise, this could introduce a bug if the "initial" mode was 
intentionally not being set.
   ```suggestion
               // Do not explicitly set startupOptions for INITIAL mode; rely 
on the default
               // behavior of MySqlSourceConfigFactory to start from the 
initial snapshot/binlog.
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java:
##########
@@ -82,6 +83,17 @@ public Object fetchRecords(@RequestBody FetchRecordRequest 
recordReq) {
         }
     }
 
+    @RequestMapping(path = "/api/fetchRecordStream", method = 
RequestMethod.POST)
+    public StreamingResponseBody fetchRecordStream(@RequestBody 
FetchRecordRequest recordReq)
+            throws Exception {
+        return pipelineCoordinator.fetchRecordStream(recordReq);
+    }
+
+    @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = 
RequestMethod.POST)

Review Comment:
   The getTaskIdOffset endpoint uses POST method but doesn't require any body 
in the request - only the taskId from the path. This should be a GET request 
instead, as it's a read-only operation retrieving data. Using POST for read 
operations violates RESTful API design principles and could cause caching and 
semantic issues.
   ```suggestion
       @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = 
RequestMethod.GET)
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy:
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cdc_stream_tvf_mysql", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_cdc_stream_tvf"
+    def mysqlDb = "test_cdc_db"
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+        def offset = ""
+
+        // create test
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {

Review Comment:
   The test uses hardcoded credentials ("root", "123456") directly in the code. 
While this is acceptable for test code, consider using configuration variables 
or test fixtures for better maintainability and to avoid accidental exposure if 
these tests are run against non-test environments.



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream";;

Review Comment:
   The URI field is defined as non-final but is never reassigned after 
initialization. It should be declared as a constant: private static final 
String URI = "http://127.0.0.1:{}/api/fetchRecordStream";;
   ```suggestion
       private static final String URI = 
"http://127.0.0.1:{}/api/fetchRecordStream";;
   ```



##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java:
##########
@@ -19,13 +19,15 @@
 
 public class DataSourceConfigKeys {
     public static final String JDBC_URL = "jdbc_url";
+    public static final String TYPE = "type";
     public static final String DRIVER_URL = "driver_url";
     public static final String DRIVER_CLASS = "driver_class";
     public static final String USER = "user";
     public static final String PASSWORD = "password";
     public static final String DATABASE = "database";
     public static final String SCHEMA = "schema";
     public static final String INCLUDE_TABLES = "include_tables";

Review Comment:
   The new constant TABLE is added without documentation. Consider adding a 
JavaDoc comment explaining the difference between TABLE and INCLUDE_TABLES 
(e.g., "Single table name for CDC streaming. Use INCLUDE_TABLES for multiple 
tables.") to clarify usage.
   ```suggestion
       public static final String INCLUDE_TABLES = "include_tables";
       /**
        * Single table name for CDC streaming. Use INCLUDE_TABLES for multiple 
tables.
        */
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -170,39 +169,14 @@ public SplitReadResult 
readSplitRecords(JobBaseRecordRequest baseReq) throws Exc
         if (offsetMeta == null || offsetMeta.isEmpty()) {
             throw new RuntimeException("miss meta offset");
         }
-        LOG.info("Job {} read split records with offset: {}", 
baseReq.getJobId(), offsetMeta);
-
-        //  If there is an active split being consumed, reuse it directly;
-        //  Otherwise, create a new snapshot/binlog split based on offset and 
start the reader.
-        MySqlSplit split = null;
-        SplitRecords currentSplitRecords = this.getCurrentSplitRecords();
-        if (currentSplitRecords == null) {
-            DebeziumReader<SourceRecords, MySqlSplit> currentReader = 
this.getCurrentReader();
-            if (baseReq.isReload() || currentReader == null) {
-                LOG.info(
-                        "No current reader or reload {}, create new split 
reader",
-                        baseReq.isReload());
-                // build split
-                Tuple2<MySqlSplit, Boolean> splitFlag = 
createMySqlSplit(offsetMeta, baseReq);
-                split = splitFlag.f0;
-                // reset binlog reader
-                // closeBinlogReader();
-                currentSplitRecords = pollSplitRecordsWithSplit(split, 
baseReq);
-                this.setCurrentSplitRecords(currentSplitRecords);
-                this.setCurrentSplit(split);
-            } else if (currentReader instanceof BinlogSplitReader) {
-                LOG.info("Continue poll records with current binlog reader");
-                // only for binlog reader
-                currentSplitRecords = 
pollSplitRecordsWithCurrentReader(currentReader);
-                split = this.getCurrentSplit();
-            } else {
-                throw new RuntimeException("Should not happen");
-            }
-        } else {
-            LOG.info(
-                    "Continue read records with current split records, 
splitId: {}",
-                    currentSplitRecords.getSplitId());
-        }
+        // Create a new snapshot/binlog split based on offset and start the 
reader.
+        LOG.info("create new split reader for {} with offset {}", 
baseReq.getJobId(), offsetMeta);
+        // build split
+        Tuple2<MySqlSplit, Boolean> splitFlag = createMySqlSplit(offsetMeta, 
baseReq);
+        MySqlSplit split = splitFlag.f0;
+        // it's necessary to ensure that the binlog reader is already closed.
+        this.currentSplitRecords = pollSplitRecordsWithSplit(split, baseReq);
+        this.currentSplit = split;

Review Comment:
   The logic that handles reusing an active split and continuing with the 
current reader has been removed (lines 173-206 in the original). This appears 
to be a significant behavioral change that forces a new split reader to be 
created on every call. This could impact performance and resource usage, 
especially for continuous streaming scenarios. If this change is intentional, 
it should be documented why the optimization for reusing readers was removed.



##########
be/src/io/fs/http_file_reader.cpp:
##########
@@ -95,34 +107,68 @@ Status HttpFileReader::open(const FileReaderOptions& opts) 
{
         return Status::OK();
     }
 
-    // Step 1: HEAD request to get file metadata
-    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
-    _client->set_method(HttpMethod::HEAD);
-    RETURN_IF_ERROR(_client->execute());
+    // start CDC client
+    auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
+    if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second == 
"true") {
+        LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
+        ExecEnv* env = ExecEnv::GetInstance();
+        if (env == nullptr || env->cdc_client_mgr() == nullptr) {
+            return Status::InternalError("ExecEnv or CdcClientMgr is not 
initialized");
+        }
+
+        PRequestCdcClientResult result;
+        Status start_st = env->cdc_client_mgr()->start_cdc_client(&result);
+        if (!start_st.ok()) {
+            LOG(ERROR) << "Failed to start CDC client, status=" << 
start_st.to_string();
+            return start_st;
+        }
+
+        _url = fmt::format(_url, doris::config::cdc_client_port);
+        _range_supported = false;
+        LOG(INFO) << "CDC client started successfully for " << _url;
+    }

Review Comment:
   The CDC client is started during the file reader open operation, but there's 
no corresponding cleanup in the close() or destructor. If the CDC client has 
resources that need cleanup (connections, threads, etc.), they may leak. 
Consider adding cleanup logic in the close() method or ensuring the CDC client 
manager handles resource cleanup properly.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -81,19 +95,153 @@ public PipelineCoordinator() {
                         new ThreadPoolExecutor.AbortPolicy());
     }
 
+    /** return data for http_file_reader */
+    public StreamingResponseBody fetchRecordStream(FetchRecordRequest 
fetchReq) throws Exception {
+        SourceReader sourceReader;
+        SplitReadResult readResult;
+        try {
+            if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) {
+                LOG.info(
+                        "Generate initial meta for fetch record request, 
jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId());
+                // means the request did not originate from the job, only tvf
+                Map<String, Object> meta = generateMeta(fetchReq.getConfig());
+                fetchReq.setMeta(meta);
+            }
+
+            sourceReader = Env.getCurrentEnv().getReader(fetchReq);
+            readResult = sourceReader.readSplitRecords(fetchReq);
+        } catch (Exception ex) {
+            throw new CommonException(ex);
+        }
+
+        return outputStream -> {
+            try {
+                buildRecords(sourceReader, fetchReq, readResult, outputStream);
+            } catch (Exception ex) {
+                LOG.error(
+                        "Failed fetch record, jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId(),
+                        ex);
+                throw new StreamException(ex);
+            }
+        };
+    }
+
+    private void buildRecords(
+            SourceReader sourceReader,
+            FetchRecordRequest fetchRecord,
+            SplitReadResult readResult,
+            OutputStream rawOutputStream)
+            throws Exception {
+        SourceSplit split = readResult.getSplit();
+        Map<String, String> lastMeta = null;
+        int rowCount = 0;
+        BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+        try {
+            // Serialize records and add them to the response (collect from 
iterator)
+            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
+            while (iterator != null && iterator.hasNext()) {
+                SourceRecord element = iterator.next();
+                List<String> serializedRecords =
+                        sourceReader.deserialize(fetchRecord.getConfig(), 
element);
+                for (String record : serializedRecords) {
+                    bos.write(record.getBytes(StandardCharsets.UTF_8));
+                    bos.write(LINE_DELIMITER);
+                }
+                rowCount += serializedRecords.size();
+            }
+            // force flush buffer
+            bos.flush();
+        } finally {
+            // The LSN in the commit is the current offset, which is the 
offset from the last
+            // successful write.
+            // Therefore, even if a subsequent write fails, it will not affect 
the commit.
+            sourceReader.commitSourceOffset(fetchRecord.getJobId(), 
readResult.getSplit());
+
+            // This must be called after commitSourceOffset; otherwise,
+            // PG's confirmed lsn will not proceed.
+            sourceReader.finishSplitRecords();
+        }
+
+        LOG.info(
+                "Fetch records completed, jobId={}, taskId={}, splitId={}, 
rowCount={}",
+                fetchRecord.getJobId(),
+                fetchRecord.getTaskId(),
+                split.splitId(),
+                rowCount);
+
+        if (readResult.getSplitState() != null) {
+            // Set meta information for hw
+            if (sourceReader.isSnapshotSplit(split)) {
+                lastMeta = 
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
+                lastMeta.put(SPLIT_ID, split.splitId());
+            }
+
+            // set meta for binlog event
+            if (sourceReader.isBinlogSplit(split)) {
+                lastMeta = 
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
+                lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+            }
+        } else {
+            throw new RuntimeException("split state is null");
+        }
+
+        if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) {
+            taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta);
+        }
+
+        if (!isLong(fetchRecord.getJobId())) {
+            // TVF requires closing the window after each execution,
+            // while PG requires dropping the slot.
+            sourceReader.close(fetchRecord);
+        }
+    }
+
+    private boolean isLong(String s) {
+        if (s == null || s.isEmpty()) return false;
+        try {
+            Long.parseLong(s);
+            return true;
+        } catch (NumberFormatException e) {
+            return false;
+        }
+    }
+
+    /** Generate split meta from request.offset */
+    private Map<String, Object> generateMeta(Map<String, String> cdcConfig)
+            throws JsonProcessingException {
+        Map<String, Object> meta = new HashMap<>();
+        String offset = cdcConfig.get(DataSourceConfigKeys.OFFSET);
+        if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)
+                || 
DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) {
+            meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+        } else if (ConfigUtil.isJson(offset)) {
+            Map<String, String> startOffset =
+                    objectMapper.readValue(offset, new TypeReference<>() {});
+            meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+            meta.put("startingOffset", startOffset);
+        } else {
+            throw new RuntimeException("Unsupported offset: " + offset);

Review Comment:
   The generateMeta method only supports 'latest', 'earliest', and JSON offsets 
for TVF, but 'initial' is a valid startup mode according to the source reader 
implementations (MySQL and PostgreSQL both have code to handle it). This 
creates an inconsistency where 'initial' mode is supported in the source 
readers but explicitly rejected for TVF usage. Consider either supporting 
'initial' in TVF or documenting why it's not supported.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -42,26 +50,32 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import io.debezium.data.Envelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
 
 /** Pipeline coordinator. */
 @Component
 public class PipelineCoordinator {
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelineCoordinator.class);
     private static final String SPLIT_ID = "splitId";
     // jobId
-    private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new 
ConcurrentHashMap<>();
+    private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new 
ConcurrentHashMap<>();
     // taskId -> writeFailReason
     private final Map<String, String> taskErrorMaps = new 
ConcurrentHashMap<>();
+    // taskId, offset

Review Comment:
   The taskOffsetCache Map is accessed without thread-safety considerations. 
While ConcurrentHashMap provides thread-safe operations for individual methods, 
the pattern of checking and retrieving data in getOffsetWithTaskId could still 
have race conditions if another thread removes the entry between check and 
retrieval in calling code. Consider documenting the thread-safety expectations 
or adding synchronization if needed.
   ```suggestion
       /**
        * Cache of task offsets keyed by taskId.
        *
        * <p>Thread-safety: this map is a {@link ConcurrentHashMap} and is 
accessed by multiple
        * threads concurrently. Entries may be added and removed at any time by 
background tasks.
        * Callers must treat the contents as a best-effort snapshot only and 
must be prepared for
        * {@code null} or missing entries even if a previous check suggested 
that an entry existed.
        *
        * <p>In particular, callers of helper methods such as {@code 
getOffsetWithTaskId} must not
        * assume that a "check then use" pattern on this cache is atomic. Any 
such logic must handle
        * the possibility that another thread has removed or updated the entry 
between operations.
        */
   ```



##########
be/src/io/fs/http_file_reader.cpp:
##########
@@ -269,6 +335,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
     long http_status = _client->get_http_status();
     VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" 
<< buf.size();
 
+    // Check for HTTP error status codes (4xx, 5xx)
+    if (http_status >= 400) {
+        std::string error_body;
+        if (buf.empty()) {
+            error_body = "(empty response body)";
+        } else {
+            // Limit error message to 1024 bytes to avoid excessive logging
+            size_t max_len = std::min(buf.size(), static_cast<size_t>(1024));
+            error_body = buf.substr(0, max_len);

Review Comment:
   The error handling for HTTP status codes >= 400 limits the error body to 
1024 bytes. While this is reasonable for logging, the comment says "avoid 
excessive logging" but doesn't handle cases where the error message might be 
truncated in the middle of a multi-byte UTF-8 character. Consider using a 
character-aware truncation or noting that the message might be incomplete.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java:
##########
@@ -35,4 +39,17 @@ public Object exceptionHandler(HttpServletRequest request, 
Exception e) {
         log.error("Unexpected exception", e);
         return RestResponse.internalError(e.getMessage());
     }
+
+    @ExceptionHandler(StreamException.class)
+    public Object streamExceptionHandler(StreamException e) {
+        // Directly throwing an exception allows curl to detect anomalies in 
the streaming response.
+        log.error("Exception in streaming response, re-throwing to client", e);
+        throw e;
+    }
+

Review Comment:
   The StreamException handler re-throws the exception after logging. This is 
appropriate for streaming responses where you want the client to detect issues. 
However, the logging should include the exception details that were already 
logged by the lambda (lines 123-128 in PipelineCoordinator). This creates 
duplicate logging. Consider whether one of these log statements could be 
removed or consolidated.
   ```suggestion
           throw e;
       }
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -81,19 +95,153 @@ public PipelineCoordinator() {
                         new ThreadPoolExecutor.AbortPolicy());
     }
 
+    /** return data for http_file_reader */
+    public StreamingResponseBody fetchRecordStream(FetchRecordRequest 
fetchReq) throws Exception {
+        SourceReader sourceReader;
+        SplitReadResult readResult;
+        try {
+            if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) {
+                LOG.info(
+                        "Generate initial meta for fetch record request, 
jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId());
+                // means the request did not originate from the job, only tvf
+                Map<String, Object> meta = generateMeta(fetchReq.getConfig());
+                fetchReq.setMeta(meta);
+            }
+
+            sourceReader = Env.getCurrentEnv().getReader(fetchReq);
+            readResult = sourceReader.readSplitRecords(fetchReq);
+        } catch (Exception ex) {
+            throw new CommonException(ex);
+        }
+
+        return outputStream -> {
+            try {
+                buildRecords(sourceReader, fetchReq, readResult, outputStream);
+            } catch (Exception ex) {
+                LOG.error(
+                        "Failed fetch record, jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId(),
+                        ex);
+                throw new StreamException(ex);
+            }
+        };
+    }
+
+    private void buildRecords(
+            SourceReader sourceReader,
+            FetchRecordRequest fetchRecord,
+            SplitReadResult readResult,
+            OutputStream rawOutputStream)
+            throws Exception {
+        SourceSplit split = readResult.getSplit();
+        Map<String, String> lastMeta = null;
+        int rowCount = 0;
+        BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+        try {
+            // Serialize records and add them to the response (collect from 
iterator)
+            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
+            while (iterator != null && iterator.hasNext()) {
+                SourceRecord element = iterator.next();
+                List<String> serializedRecords =
+                        sourceReader.deserialize(fetchRecord.getConfig(), 
element);
+                for (String record : serializedRecords) {
+                    bos.write(record.getBytes(StandardCharsets.UTF_8));
+                    bos.write(LINE_DELIMITER);
+                }
+                rowCount += serializedRecords.size();
+            }
+            // force flush buffer
+            bos.flush();
+        } finally {
+            // The LSN in the commit is the current offset, which is the 
offset from the last
+            // successful write.
+            // Therefore, even if a subsequent write fails, it will not affect 
the commit.
+            sourceReader.commitSourceOffset(fetchRecord.getJobId(), 
readResult.getSplit());
+
+            // This must be called after commitSourceOffset; otherwise,
+            // PG's confirmed lsn will not proceed.
+            sourceReader.finishSplitRecords();
+        }
+
+        LOG.info(
+                "Fetch records completed, jobId={}, taskId={}, splitId={}, 
rowCount={}",
+                fetchRecord.getJobId(),
+                fetchRecord.getTaskId(),
+                split.splitId(),
+                rowCount);
+
+        if (readResult.getSplitState() != null) {
+            // Set meta information for hw
+            if (sourceReader.isSnapshotSplit(split)) {
+                lastMeta = 
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
+                lastMeta.put(SPLIT_ID, split.splitId());
+            }
+
+            // set meta for binlog event
+            if (sourceReader.isBinlogSplit(split)) {
+                lastMeta = 
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
+                lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+            }
+        } else {
+            throw new RuntimeException("split state is null");
+        }
+
+        if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) {
+            taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta);
+        }
+
+        if (!isLong(fetchRecord.getJobId())) {
+            // TVF requires closing the window after each execution,
+            // while PG requires dropping the slot.
+            sourceReader.close(fetchRecord);
+        }
+    }
+
+    private boolean isLong(String s) {
+        if (s == null || s.isEmpty()) return false;
+        try {
+            Long.parseLong(s);
+            return true;
+        } catch (NumberFormatException e) {
+            return false;
+        }

Review Comment:
   The isLong method uses a try-catch for control flow to determine if a string 
is parseable as a Long. While this works, it's inefficient and considered an 
anti-pattern. Consider using a regex pattern or manual character validation 
instead. Example: return s.matches("^-?\\d+$");



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -96,6 +98,21 @@ public static ZoneId 
getPostgresServerTimeZoneFromProps(java.util.Properties pro
         return ZoneId.systemDefault();
     }
 
+    public static String[] getTableList(String schema, Map<String, String> 
cdcConfig) {
+        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
+        String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
+        if (StringUtils.isNotEmpty(includingTables)) {
+            return Arrays.stream(includingTables.split(","))
+                    .map(t -> schema + "." + t.trim())
+                    .toArray(String[]::new);
+        } else if (StringUtils.isNotEmpty(table)) {
+            Preconditions.checkArgument(!table.contains(","), "table only 
supports one table");
+            return new String[] {schema + "." + table.trim()};
+        } else {
+            return new String[0];
+        }
+    }

Review Comment:
   The getTableList method doesn't validate that the schema parameter is not 
null or empty before using it in string concatenation. If schema is null or 
empty, this will create malformed table names like ".tablename". Add validation 
to ensure schema is not null/empty, or document that callers must validate this.



##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java:
##########
@@ -19,13 +19,15 @@
 
 public class DataSourceConfigKeys {
     public static final String JDBC_URL = "jdbc_url";

Review Comment:
   The new constant TYPE is added without documentation. Consider adding a 
JavaDoc comment explaining what this configuration key represents (e.g., "Data 
source type: 'mysql' or 'postgres'") to help users understand how to use it.
   ```suggestion
       public static final String JDBC_URL = "jdbc_url";
       /**
        * Data source type, e.g. "mysql" or "postgres".
        */
   ```



##########
be/src/io/fs/http_file_reader.cpp:
##########
@@ -95,34 +107,68 @@ Status HttpFileReader::open(const FileReaderOptions& opts) 
{
         return Status::OK();
     }
 
-    // Step 1: HEAD request to get file metadata
-    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
-    _client->set_method(HttpMethod::HEAD);
-    RETURN_IF_ERROR(_client->execute());
+    // start CDC client
+    auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
+    if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second == 
"true") {
+        LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
+        ExecEnv* env = ExecEnv::GetInstance();
+        if (env == nullptr || env->cdc_client_mgr() == nullptr) {
+            return Status::InternalError("ExecEnv or CdcClientMgr is not 
initialized");
+        }
+
+        PRequestCdcClientResult result;
+        Status start_st = env->cdc_client_mgr()->start_cdc_client(&result);
+        if (!start_st.ok()) {
+            LOG(ERROR) << "Failed to start CDC client, status=" << 
start_st.to_string();
+            return start_st;
+        }
+
+        _url = fmt::format(_url, doris::config::cdc_client_port);

Review Comment:
   The URI template placeholder "{}" expects to be formatted with the CDC 
client port, but there's no validation that the format operation succeeds or 
that the port value is valid before the HTTP request is made. If 
config::cdc_client_port is not set correctly, this could result in an invalid 
URL. Consider adding validation or error handling for the URL formatting.



##########
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java:
##########
@@ -28,8 +28,4 @@
 @EqualsAndHashCode(callSuper = true)
 public abstract class JobBaseRecordRequest extends JobBaseConfig {
     protected Map<String, Object> meta;

Review Comment:
   The removal of isReload() and getFetchSize() abstract methods from 
JobBaseRecordRequest and their implementations in child classes appears to be a 
breaking change. If these methods were part of a public API or used by other 
components, this could cause compilation errors. Verify that no other code 
depends on these methods.
   ```suggestion
       protected Map<String, Object> meta;
   
       /**
        * Whether this request is a reload.
        * <p>
        * Default implementation reads the "reload" flag from {@link #meta} if 
present,
        * otherwise returns {@code false}.
        */
       public boolean isReload() {
           if (meta == null) {
               return false;
           }
           Object value = meta.get("reload");
           if (value instanceof Boolean) {
               return (Boolean) value;
           }
           return false;
       }
   
       /**
        * Fetch size for this request.
        * <p>
        * Default implementation reads the "fetchSize" value from {@link #meta} 
if present,
        * otherwise returns {@code 0}.
        */
       public int getFetchSize() {
           if (meta == null) {
               return 0;
           }
           Object value = meta.get("fetchSize");
           if (value instanceof Number) {
               return ((Number) value).intValue();
           }
           return 0;
       }
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -167,29 +166,18 @@ private PostgresSourceConfig 
generatePostgresConfig(Map<String, String> cdcConfi
         configFactory.includeSchemaChanges(false);
 
         // Set table list
-        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
-        if (StringUtils.isNotEmpty(includingTables)) {
-            String[] includingTbls =
-                    Arrays.stream(includingTables.split(","))
-                            .map(t -> schema + "." + t.trim())
-                            .toArray(String[]::new);
-            configFactory.tableList(includingTbls);
-        }
+        String[] tableList = ConfigUtil.getTableList(schema, cdcConfig);
+        Preconditions.checkArgument(tableList.length >= 1, "include_tables or 
table is required");
+        configFactory.tableList(tableList);
 
         // Set startup options
         String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
         if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startupMode)) 
{
             configFactory.startupOptions(StartupOptions.initial());
-        } else if 
(DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(startupMode)) {
-            configFactory.startupOptions(StartupOptions.earliest());
         } else if 
(DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) {
             configFactory.startupOptions(StartupOptions.latest());
         } else if (ConfigUtil.isJson(startupMode)) {
             throw new RuntimeException("Unsupported json offset " + 
startupMode);

Review Comment:
   The error message "Unsupported json offset" is misleading because it 
suggests JSON offsets are not supported, but the actual issue is that JSON 
offset parsing for PostgreSQL is not implemented. Consider changing to a more 
descriptive message like "JSON offset format is not yet supported for 
PostgreSQL" or "Specific LSN offsets are not supported, use 'initial' or 
'latest'".
   ```suggestion
               throw new RuntimeException(
                       "JSON offset format is not yet supported for PostgreSQL, 
use 'initial' or "
                               + "'latest': "
                               + startupMode);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream";;
+    private final Map<String, String> originProps;
+
+    public CdcStreamTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        this.originProps = properties;
+        processProps(properties);
+        validate(properties);
+    }
+
+    private void processProps(Map<String, String> properties) throws 
AnalysisException {
+        Map<String, String> copyProps = new HashMap<>(properties);
+        copyProps.put("format", "json");
+        super.parseCommonProperties(copyProps);
+        this.processedParams.put("enable_cdc_client", "true");
+        this.processedParams.put("uri", URI);
+        this.processedParams.put("http.enable.range.request", "false");
+        this.processedParams.put("http.chunk.response", "true");
+        this.processedParams.put("http.method", "POST");
+
+        String payload = generateParams(properties);
+        this.processedParams.put("http.payload", payload);
+        this.backendConnectProperties.putAll(processedParams);
+        generateFileStatus();
+    }
+
+    private String generateParams(Map<String, String> properties) throws 
AnalysisException {
+        FetchRecordRequest recordRequest = new FetchRecordRequest();
+        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+        recordRequest.setConfig(properties);
+        try {
+            return objectMapper.writeValueAsString(recordRequest);
+        } catch (IOException e) {
+            LOG.info("Failed to serialize fetch record request," + 
e.getMessage());

Review Comment:
   The error logging at line 77 concatenates strings directly instead of using 
parameterized logging. This is less efficient and doesn't follow logging best 
practices. Change to: LOG.info("Failed to serialize fetch record request: {}", 
e.getMessage());
   ```suggestion
               LOG.info("Failed to serialize fetch record request: {}", 
e.getMessage());
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -96,6 +98,21 @@ public static ZoneId 
getPostgresServerTimeZoneFromProps(java.util.Properties pro
         return ZoneId.systemDefault();
     }
 
+    public static String[] getTableList(String schema, Map<String, String> 
cdcConfig) {
+        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
+        String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
+        if (StringUtils.isNotEmpty(includingTables)) {
+            return Arrays.stream(includingTables.split(","))
+                    .map(t -> schema + "." + t.trim())
+                    .toArray(String[]::new);
+        } else if (StringUtils.isNotEmpty(table)) {
+            Preconditions.checkArgument(!table.contains(","), "table only 
supports one table");

Review Comment:
   The table parameter check at line 109 uses Preconditions.checkArgument to 
ensure it doesn't contain commas, but this error won't be very helpful to 
users. Consider a more descriptive error message like "table parameter should 
contain a single table name, use include_tables for multiple tables" to guide 
users toward the correct parameter.
   ```suggestion
               Preconditions.checkArgument(
                       !table.contains(","),
                       "table parameter should contain a single table name, use 
include_tables for multiple tables");
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream";;
+    private final Map<String, String> originProps;
+
+    public CdcStreamTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        this.originProps = properties;
+        processProps(properties);
+        validate(properties);
+    }
+
+    private void processProps(Map<String, String> properties) throws 
AnalysisException {
+        Map<String, String> copyProps = new HashMap<>(properties);
+        copyProps.put("format", "json");
+        super.parseCommonProperties(copyProps);
+        this.processedParams.put("enable_cdc_client", "true");
+        this.processedParams.put("uri", URI);
+        this.processedParams.put("http.enable.range.request", "false");
+        this.processedParams.put("http.chunk.response", "true");
+        this.processedParams.put("http.method", "POST");
+
+        String payload = generateParams(properties);
+        this.processedParams.put("http.payload", payload);
+        this.backendConnectProperties.putAll(processedParams);
+        generateFileStatus();
+    }
+
+    private String generateParams(Map<String, String> properties) throws 
AnalysisException {
+        FetchRecordRequest recordRequest = new FetchRecordRequest();
+        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+        recordRequest.setConfig(properties);
+        try {
+            return objectMapper.writeValueAsString(recordRequest);
+        } catch (IOException e) {
+            LOG.info("Failed to serialize fetch record request," + 
e.getMessage());
+            throw new AnalysisException(e.getMessage());
+        }
+    }
+
+    private void validate(Map<String, String> properties) {
+        
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL),
 "jdbc_url is required");
+        
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE), 
"type is required");
+        
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), 
"table is required");
+    }
+
+    private void generateFileStatus() {
+        this.fileStatuses.clear();
+        this.fileStatuses.add(new TBrokerFileStatus(URI, false, 
Integer.MAX_VALUE, false));
+    }
+
+    @Override
+    public List<Column> getTableColumns() throws AnalysisException {
+        DataSourceType dataSourceType =
+                
DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase());

Review Comment:
   The getTableColumns method uses valueOf which will throw 
IllegalArgumentException for invalid data source types, but doesn't provide a 
user-friendly error message. Consider adding a try-catch to provide a more 
helpful error message listing the supported types (e.g., "Unsupported data 
source type: X. Supported types are: MYSQL, POSTGRES").
   ```suggestion
           String typeStr = processedParams.get(DataSourceConfigKeys.TYPE);
           DataSourceType dataSourceType;
           try {
               dataSourceType = DataSourceType.valueOf(typeStr.toUpperCase());
           } catch (IllegalArgumentException e) {
               DataSourceType[] supportedTypes = DataSourceType.values();
               StringBuilder supported = new StringBuilder();
               for (int i = 0; i < supportedTypes.length; i++) {
                   if (i > 0) {
                       supported.append(", ");
                   }
                   supported.append(supportedTypes[i].name());
               }
               throw new AnalysisException("Unsupported data source type: " + 
typeStr
                       + ". Supported types are: " + supported);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to