This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 7dd40bd31ca branch-4.1: [Feature](tvf) Support cdc stream tvf for 
mysql and pg #60116 (#61840)
7dd40bd31ca is described below

commit 7dd40bd31caaf84b816572a5ae7ca05fb926afcc
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 30 10:51:41 2026 +0800

    branch-4.1: [Feature](tvf) Support cdc stream tvf for mysql and pg #60116 
(#61840)
    
    Cherry-picked from #60116
    
    Co-authored-by: wudi <[email protected]>
---
 be/src/io/fs/http_file_reader.cpp                  | 140 +++++++++++--
 be/src/io/fs/http_file_reader.h                    |   6 +
 .../apache/doris/job/cdc/DataSourceConfigKeys.java |   1 +
 .../job/cdc/request/CompareOffsetRequest.java      |   2 +-
 .../doris/job/cdc/request/FetchRecordRequest.java  |   1 +
 .../job/cdc/request/FetchTableSplitsRequest.java   |   2 +-
 .../doris/job/cdc/request/JobBaseConfig.java       |   2 +-
 .../doris/catalog/BuiltinTableValuedFunctions.java |   4 +-
 .../insert/streaming/StreamingMultiTblTask.java    |   2 +-
 .../job/offset/jdbc/JdbcSourceOffsetProvider.java  |   6 +-
 .../apache/doris/job/util/StreamingJobUtils.java   |   7 +-
 .../expressions/functions/table/CdcStream.java     |  59 ++++++
 .../visitor/TableValuedFunctionVisitor.java        |   5 +
 .../CdcStreamTableValuedFunction.java              | 140 +++++++++++++
 .../org/apache/doris/cdcclient/common/Env.java     |  20 +-
 .../cdcclient/config/GlobalExceptionHandler.java   |  17 ++
 .../cdcclient/controller/ClientController.java     |  14 +-
 .../doris/cdcclient/exception/CommonException.java |  36 ++--
 .../doris/cdcclient/exception/StreamException.java |  36 ++--
 .../cdcclient/service/PipelineCoordinator.java     | 184 ++++++++++++++++-
 .../doris/cdcclient/sink/DorisBatchStreamLoad.java |  10 +-
 .../source/reader/JdbcIncrementalSourceReader.java |   2 +-
 .../cdcclient/source/reader/SourceReader.java      |   4 +-
 .../source/reader/mysql/MySqlSourceReader.java     |  23 ++-
 .../reader/postgres/PostgresSourceReader.java      |  22 +--
 .../apache/doris/cdcclient/utils/ConfigUtil.java   |  22 ++-
 .../doris/cdcclient/utils/ConfigUtilTest.java      |  85 ++++++++
 .../cdc/tvf/test_cdc_stream_tvf_mysql.out          |  11 ++
 .../cdc/tvf/test_cdc_stream_tvf_mysql.groovy       | 218 +++++++++++++++++++++
 .../cdc/tvf/test_cdc_stream_tvf_postgres.groovy    |  82 ++++++++
 30 files changed, 1057 insertions(+), 106 deletions(-)

diff --git a/be/src/io/fs/http_file_reader.cpp 
b/be/src/io/fs/http_file_reader.cpp
index da27f94c970..a2085c6fe6b 100644
--- a/be/src/io/fs/http_file_reader.cpp
+++ b/be/src/io/fs/http_file_reader.cpp
@@ -22,7 +22,11 @@
 
 #include <algorithm>
 
+#include "common/config.h"
 #include "common/logging.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "runtime/cdc_client_mgr.h"
+#include "runtime/exec_env.h"
 
 namespace doris::io {
 
@@ -84,6 +88,17 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, 
std::string url, in
         }
     }
 
+    // Parse chunk response configuration; chunk response implies no Range 
support
+    auto chunk_iter = _extend_kv.find("http.enable.chunk.response");
+    if (chunk_iter != _extend_kv.end()) {
+        std::string value = chunk_iter->second;
+        std::transform(value.begin(), value.end(), value.begin(), ::tolower);
+        _enable_chunk_response = (value == "true" || value == "1");
+        if (_enable_chunk_response) {
+            _range_supported = false;
+        }
+    }
+
     _read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
 }
 
@@ -91,37 +106,82 @@ HttpFileReader::~HttpFileReader() {
     static_cast<void>(close());
 }
 
+Status HttpFileReader::setup_cdc_client() {
+    auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
+    if (enable_cdc_iter == _extend_kv.end() || enable_cdc_iter->second != 
"true") {
+        return Status::OK();
+    }
+
+    LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
+    ExecEnv* env = ExecEnv::GetInstance();
+    if (env == nullptr || env->cdc_client_mgr() == nullptr) {
+        return Status::InternalError("ExecEnv or CdcClientMgr is not 
initialized");
+    }
+
+    PRequestCdcClientResult result;
+    Status start_st = env->cdc_client_mgr()->start_cdc_client(&result);
+    if (!start_st.ok()) {
+        LOG(ERROR) << "Failed to start CDC client, status=" << 
start_st.to_string();
+        return start_st;
+    }
+
+    // Replace CDC_CLIENT_PORT placeholder with actual CDC client port
+    const std::string placeholder = "CDC_CLIENT_PORT";
+    size_t pos = _url.find(placeholder);
+    if (pos != std::string::npos) {
+        _url.replace(pos, placeholder.size(), 
std::to_string(doris::config::cdc_client_port));
+    }
+    LOG(INFO) << "CDC client started successfully for " << _url;
+    return Status::OK();
+}
+
 Status HttpFileReader::open(const FileReaderOptions& opts) {
+    // CDC client setup must run before the _initialized guard.
+    // See setup_cdc_client() for lifecycle details.
+    RETURN_IF_ERROR(setup_cdc_client());
+
+    // Skip metadata detection when file size was pre-supplied by the caller.
     if (_initialized) {
         return Status::OK();
     }
 
-    // Step 1: HEAD request to get file metadata
-    RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
-    _client->set_method(HttpMethod::HEAD);
-    RETURN_IF_ERROR(_client->execute());
+    // Step 1: HEAD request to get file metadata (skip for chunk response)
+    if (_enable_chunk_response) {
+        // Chunk streaming response: size is unknown until the stream 
completes.
+        // _range_supported is already false (set in constructor).
+        _size_known = false;
+        // Reset _file_size from the SIZE_MAX default to 0 so that any caller 
of
+        // size() (e.g. NewJsonReader::_read_one_message) does not attempt to
+        // allocate SIZE_MAX bytes before the download completes.
+        _file_size = 0;
+        LOG(INFO) << "Chunk response mode enabled, skipping HEAD request for " 
<< _url;
+    } else {
+        // Normal mode: execute HEAD request to get file metadata
+        RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
+        _client->set_method(HttpMethod::HEAD);
+        RETURN_IF_ERROR(_client->execute());
 
-    uint64_t content_length = 0;
-    RETURN_IF_ERROR(_client->get_content_length(&content_length));
+        uint64_t content_length = 0;
+        RETURN_IF_ERROR(_client->get_content_length(&content_length));
 
-    _file_size = content_length;
-    _size_known = true;
+        _file_size = content_length;
+        _size_known = true;
+    }
 
-    // Step 2: Check if Range request is disabled by configuration
-    if (!_enable_range_request) {
-        // User explicitly disabled Range requests, use non-Range mode directly
+    // Step 2: Check if Range request is disabled by configuration.
+    // Chunk response mode always has _range_supported=false (set in 
constructor), so only
+    // the non-chunk non-Range path needs the file size guard.
+    if (_enable_chunk_response) {
+        // Nothing to do: _range_supported already false, size check not 
applicable
+    } else if (!_enable_range_request) {
         _range_supported = false;
-        LOG(INFO) << "Range requests disabled by configuration for " << _url
-                  << ", using non-Range mode. File size: " << _file_size << " 
bytes";
-
-        // Check if file size exceeds limit for non-Range mode
+        LOG(INFO) << "Range requests disabled by configuration for " << _url;
         if (_file_size > _max_request_size_bytes) {
             return Status::InternalError(
-                    "Non-Range mode: file size ({} bytes) exceeds maximum 
allowed size ({} bytes, "
-                    "configured by http.max.request.size.bytes). URL: {}",
+                    "Non-Range mode: file size ({} bytes) exceeds maximum 
allowed size ({} "
+                    "bytes, configured by http.max.request.size.bytes). URL: 
{}",
                     _file_size, _max_request_size_bytes, _url);
         }
-
         LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: 
" << _file_size
                   << " bytes, max allowed: " << _max_request_size_bytes << " 
bytes";
     } else {
@@ -224,9 +284,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
     VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << 
req_len
             << " with_range=" << _range_supported;
 
-    // Prepare and initialize the HTTP client for GET request
+    // Prepare and initialize the HTTP client for request
     RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
-    _client->set_method(HttpMethod::GET);
+
+    // Determine HTTP method from configuration (default: GET)
+    HttpMethod method = HttpMethod::GET;
+    auto method_iter = _extend_kv.find("http.method");
+    if (method_iter != _extend_kv.end()) {
+        method = to_http_method(method_iter->second.c_str());
+        if (method == HttpMethod::UNKNOWN) {
+            LOG(WARNING) << "Invalid http.method value: " << 
method_iter->second
+                         << ", falling back to GET";
+            method = HttpMethod::GET;
+        }
+    }
+    _client->set_method(method);
+
+    // Set payload if configured (supports POST, PUT, DELETE, etc.)
+    auto payload_iter = _extend_kv.find("http.payload");
+    if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) {
+        _client->set_payload(payload_iter->second);
+        _client->set_content_type("application/json");
+        VLOG(2) << "HTTP request with payload, size=" << 
payload_iter->second.size();
+    }
 
     _client->set_header("Expect", "");
     _client->set_header("Connection", "close");
@@ -270,6 +350,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
     long http_status = _client->get_http_status();
     VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" 
<< buf.size();
 
+    // Check for HTTP error status codes (4xx, 5xx)
+    if (http_status >= 400) {
+        std::string error_body;
+        if (buf.empty()) {
+            error_body = "(empty response body)";
+        } else {
+            // Limit error message to 1024 bytes to avoid excessive logging
+            size_t max_len = std::min(buf.size(), static_cast<size_t>(1024));
+            error_body = buf.substr(0, max_len);
+        }
+
+        return Status::InternalError("HTTP request failed with status {}: 
{}.", http_status,
+                                     error_body);
+    }
+
     if (buf.empty()) {
         *bytes_read = buffer_offset;
         return Status::OK();
@@ -295,6 +390,11 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
         // Cache the complete file content for subsequent reads
         _full_file_cache = std::move(buf);
         _full_file_cached = true;
+        // Now that the full content is in hand, update _file_size to the 
actual
+        // byte count. This replaces the 0 placeholder set in open() for chunk
+        // response mode, so subsequent calls to size() return a correct value.
+        _file_size = _full_file_cache.size();
+        _size_known = true;
 
         VLOG(2) << "Cached full file: " << _full_file_cache.size() << " bytes";
 
diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h
index 9b7f52e5270..91a360a000d 100644
--- a/be/src/io/fs/http_file_reader.h
+++ b/be/src/io/fs/http_file_reader.h
@@ -62,6 +62,10 @@ private:
     // Returns OK on success with _range_supported set appropriately
     Status detect_range_support();
 
+    // Start the CDC client process
+    // Called at the start of open() when enable_cdc_client=true.
+    Status setup_cdc_client();
+
     std::unique_ptr<char[]> _read_buffer;
     static constexpr size_t READ_BUFFER_SIZE = 1 << 20; // 1MB
     // Default maximum file size for servers that don't support Range requests
@@ -89,6 +93,8 @@ private:
     // Full file cache for non-Range mode to avoid repeated downloads
     std::string _full_file_cache;   // Cache complete file content
     bool _full_file_cached = false; // Whether full file has been cached
+
+    bool _enable_chunk_response = false; // Whether server returns chunk 
streaming response
 };
 
 } // namespace doris::io
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 47ee5f21d27..9d2d5034eab 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -19,6 +19,7 @@ package org.apache.doris.job.cdc;
 
 public class DataSourceConfigKeys {
     public static final String JDBC_URL = "jdbc_url";
+    public static final String TYPE = "type";
     public static final String DRIVER_URL = "driver_url";
     public static final String DRIVER_CLASS = "driver_class";
     public static final String USER = "user";
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
index 1a57cbdefe3..814605b7762 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
@@ -38,7 +38,7 @@ public class CompareOffsetRequest extends JobBaseConfig {
             String frontendAddress,
             Map<String, String> offsetFirst,
             Map<String, String> offsetSecond) {
-        super(jobId, sourceType, sourceProperties, frontendAddress);
+        super(jobId.toString(), sourceType, sourceProperties, frontendAddress);
         this.offsetFirst = offsetFirst;
         this.offsetSecond = offsetSecond;
     }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
index 7ed28d618fd..65ffd36966c 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
@@ -23,4 +23,5 @@ import lombok.EqualsAndHashCode;
 @Data
 @EqualsAndHashCode(callSuper = true)
 public class FetchRecordRequest extends JobBaseRecordRequest {
+    private String taskId;
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
index 5c3cf62ab4c..a54b2630734 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
@@ -35,7 +35,7 @@ public class FetchTableSplitsRequest extends JobBaseConfig {
 
     public FetchTableSplitsRequest(Long jobId, String name,
             Map<String, String> sourceProperties, String frontendAddress, 
String snapshotTable) {
-        super(jobId, name, sourceProperties, frontendAddress);
+        super(jobId.toString(), name, sourceProperties, frontendAddress);
         this.snapshotTable = snapshotTable;
     }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
index c7b60026d88..49c95d98ff5 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
+++ 
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
@@ -27,7 +27,7 @@ import java.util.Map;
 @AllArgsConstructor
 @NoArgsConstructor
 public class JobBaseConfig {
-    private Long jobId;
+    private String jobId;
     private String dataSource;
     private Map<String, String> config;
     private String frontendAddress;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index bafd2eeb918..a55ebedce9f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
 import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
+import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream;
 import org.apache.doris.nereids.trees.expressions.functions.table.File;
 import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks;
@@ -75,7 +76,8 @@ public class BuiltinTableValuedFunctions implements 
FunctionHelper {
             tableValued(ParquetMeta.class, "parquet_meta"),
             tableValued(ParquetFileMetadata.class, "parquet_file_metadata"),
             tableValued(ParquetKvMetadata.class, "parquet_kv_metadata"),
-            tableValued(ParquetBloomProbe.class, "parquet_bloom_probe")
+            tableValued(ParquetBloomProbe.class, "parquet_bloom_probe"),
+            tableValued(CdcStream.class, "cdc_stream")
     );
 
     public static final BuiltinTableValuedFunctions INSTANCE = new 
BuiltinTableValuedFunctions();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 32526f9c513..2c9fbf6fe17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -181,7 +181,7 @@ public class StreamingMultiTblTask extends 
AbstractStreamingTask {
     private WriteRecordRequest buildRequestParams() throws JobException {
         JdbcOffset offset = (JdbcOffset) runningOffset;
         WriteRecordRequest request = new WriteRecordRequest();
-        request.setJobId(getJobId());
+        request.setJobId(String.valueOf(getJobId()));
         request.setConfig(sourceProperties);
 
         request.setDataSource(dataSourceType.name());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index b77dd8d8bd6..964659e79f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -198,7 +198,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     public void fetchRemoteMeta(Map<String, String> properties) throws 
Exception {
         Backend backend = StreamingJobUtils.selectBackend();
         JobBaseConfig requestParams =
-                new JobBaseConfig(getJobId(), sourceType.name(), 
sourceProperties, getFrontendAddress());
+                new JobBaseConfig(getJobId().toString(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/fetchEndOffset")
                 .setParams(new Gson().toJson(requestParams)).build();
@@ -570,7 +570,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
     private void initSourceReader() throws JobException {
         Backend backend = StreamingJobUtils.selectBackend();
         JobBaseConfig requestParams =
-                new JobBaseConfig(getJobId(), sourceType.name(), 
sourceProperties, getFrontendAddress());
+                new JobBaseConfig(getJobId().toString(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/initReader")
                 .setParams(new Gson().toJson(requestParams)).build();
@@ -618,7 +618,7 @@ public class JdbcSourceOffsetProvider implements 
SourceOffsetProvider {
         StreamingJobUtils.deleteJobMeta(jobId);
         Backend backend = StreamingJobUtils.selectBackend();
         JobBaseConfig requestParams =
-                new JobBaseConfig(getJobId(), sourceType.name(), 
sourceProperties, getFrontendAddress());
+                new JobBaseConfig(getJobId().toString(), sourceType.name(), 
sourceProperties, getFrontendAddress());
         InternalService.PRequestCdcClientRequest request = 
InternalService.PRequestCdcClientRequest.newBuilder()
                 .setApi("/api/close")
                 .setParams(new Gson().toJson(requestParams)).build();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 9eec0061219..46a47036ccd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -215,7 +215,7 @@ public class StreamingJobUtils {
         return ctx;
     }
 
-    private static JdbcClient getJdbcClient(DataSourceType sourceType, 
Map<String, String> properties) {
+    public static JdbcClient getJdbcClient(DataSourceType sourceType, 
Map<String, String> properties) {
         JdbcClientConfig config = new JdbcClientConfig();
         config.setCatalog(sourceType.name());
         config.setUser(properties.get(DataSourceConfigKeys.USER));
@@ -448,8 +448,7 @@ public class StreamingJobUtils {
      * The remoteDB implementation differs for each data source;
      * refer to the hierarchical mapping in the JDBC catalog.
      */
-    private static String getRemoteDbName(DataSourceType sourceType, 
Map<String, String> properties)
-            throws JobException {
+    public static String getRemoteDbName(DataSourceType sourceType, 
Map<String, String> properties) {
         String remoteDb = null;
         switch (sourceType) {
             case MYSQL:
@@ -461,7 +460,7 @@ public class StreamingJobUtils {
                 Preconditions.checkArgument(StringUtils.isNotEmpty(remoteDb), 
"schema is required");
                 break;
             default:
-                throw new JobException("Unsupported source type " + 
sourceType);
+                throw new RuntimeException("Unsupported source type " + 
sourceType);
         }
         return remoteDb;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java
new file mode 100644
index 00000000000..3a6cdaf61da
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/**
+ * CdcStream TVF.
+ */
+public class CdcStream extends TableValuedFunction {
+
+    public CdcStream(Properties tvfProperties) {
+        super("cdc_stream", tvfProperties);
+    }
+
+    @Override
+    public FunctionSignature customSignature() {
+        return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, 
getArgumentsTypes());
+    }
+
+    @Override
+    protected TableValuedFunctionIf toCatalogFunction() {
+        try {
+            Map<String, String> arguments = getTVFProperties().getMap();
+            return new CdcStreamTableValuedFunction(arguments);
+        } catch (Throwable t) {
+            throw new AnalysisException("Can not build 
CdcStreamTableValuedFunction by "
+                    + this + ": " + t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitCdcStream(this, context);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 91fe9c5a168..31b3162e647 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.expressions.visitor;
 
 import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
 import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
+import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream;
 import org.apache.doris.nereids.trees.expressions.functions.table.File;
 import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
 import 
org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks;
@@ -80,6 +81,10 @@ public interface TableValuedFunctionVisitor<R, C> {
         return visitTableValuedFunction(http, context);
     }
 
+    default R visitCdcStream(CdcStream cdcStream, C context) {
+        return visitTableValuedFunction(cdcStream, context);
+    }
+
     default R visitFrontendsDisks(FrontendsDisks frontendsDisks, C context) {
         return visitTableValuedFunction(frontendsDisks, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
new file mode 100644
index 00000000000..621d2fbe6b6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final String URI = 
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";;
+
+    public CdcStreamTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        validate(properties);
+        processProps(properties);
+    }
+
+    private void processProps(Map<String, String> properties) throws 
AnalysisException {
+        Map<String, String> copyProps = new HashMap<>(properties);
+        copyProps.put("format", "json");
+        super.parseCommonProperties(copyProps);
+        this.processedParams.put("enable_cdc_client", "true");
+        this.processedParams.put("uri", URI);
+        this.processedParams.put("http.enable.range.request", "false");
+        this.processedParams.put("http.enable.chunk.response", "true");
+        this.processedParams.put("http.method", "POST");
+
+        String payload = generateParams(properties);
+        this.processedParams.put("http.payload", payload);
+        this.backendConnectProperties.putAll(processedParams);
+        generateFileStatus();
+    }
+
+    private String generateParams(Map<String, String> properties) throws 
AnalysisException {
+        FetchRecordRequest recordRequest = new FetchRecordRequest();
+        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+        recordRequest.setConfig(properties);
+        try {
+            return objectMapper.writeValueAsString(recordRequest);
+        } catch (IOException e) {
+            LOG.warn("Failed to serialize fetch record request", e);
+            throw new AnalysisException("Failed to serialize fetch record 
request: " + e.getMessage(), e);
+        }
+    }
+
+    private void validate(Map<String, String> properties) throws 
AnalysisException {
+        if (!properties.containsKey(DataSourceConfigKeys.JDBC_URL)) {
+            throw new AnalysisException("jdbc_url is required");
+        }
+        if (!properties.containsKey(DataSourceConfigKeys.TYPE)) {
+            throw new AnalysisException("type is required");
+        }
+        if (!properties.containsKey(DataSourceConfigKeys.TABLE)) {
+            throw new AnalysisException("table is required");
+        }
+        if (!properties.containsKey(DataSourceConfigKeys.OFFSET)) {
+            throw new AnalysisException("offset is required");
+        }
+    }
+
+    private void generateFileStatus() {
+        this.fileStatuses.clear();
+        this.fileStatuses.add(new TBrokerFileStatus(URI, false, 
Integer.MAX_VALUE, false));
+    }
+
+    @Override
+    public List<Column> getTableColumns() throws AnalysisException {
+        DataSourceType dataSourceType =
+                
DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase());
+        JdbcClient jdbcClient = 
StreamingJobUtils.getJdbcClient(dataSourceType, processedParams);
+        try {
+            String database = 
StreamingJobUtils.getRemoteDbName(dataSourceType, processedParams);
+            String table = processedParams.get(DataSourceConfigKeys.TABLE);
+            if (!jdbcClient.isTableExist(database, table)) {
+                throw new AnalysisException("Table does not exist: " + table);
+            }
+            return jdbcClient.getColumnsFromJdbc(database, table);
+        } finally {
+            jdbcClient.closeClient();
+        }
+    }
+
+    @Override
+    public TFileType getTFileType() {
+        return TFileType.FILE_HTTP;
+    }
+
+    @Override
+    public String getFilePath() {
+        return URI;
+    }
+
+    @Override
+    public BrokerDesc getBrokerDesc() {
+        return new BrokerDesc("CdcStreamTvfBroker", StorageType.HTTP, 
processedParams);
+    }
+
+    @Override
+    public String getTableName() {
+        return "CdcStreamTableValuedFunction";
+    }
+
+    @Override
+    public List<String> getPathPartitionKeys() {
+        return new ArrayList<>();
+    }
+}
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index 332a4766002..79d5c65cf57 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
 public class Env {
     private static final Logger LOG = LoggerFactory.getLogger(Env.class);
     private static volatile Env INSTANCE;
-    private final Map<Long, JobContext> jobContexts;
-    private final Map<Long, Lock> jobLocks;
+    private final Map<String, JobContext> jobContexts;
+    private final Map<String, Lock> jobLocks;
     @Setter private int backendHttpPort;
     @Setter @Getter private String clusterToken;
     @Setter @Getter private volatile String feMasterAddress;
@@ -85,9 +85,9 @@ public class Env {
     }
 
     private SourceReader getOrCreateReader(
-            Long jobId, DataSource dataSource, Map<String, String> config) {
-        Objects.requireNonNull(jobId, "jobId");
-        Objects.requireNonNull(dataSource, "dataSource");
+            String jobId, DataSource dataSource, Map<String, String> config) {
+        Objects.requireNonNull(jobId, "jobId is null");
+        Objects.requireNonNull(dataSource, "dataSource is null");
         JobContext context = jobContexts.get(jobId);
         if (context != null) {
             return context.getReader(dataSource);
@@ -112,7 +112,7 @@ public class Env {
         }
     }
 
-    public void close(Long jobId) {
+    public void close(String jobId) {
         Lock lock = jobLocks.get(jobId);
         if (lock != null) {
             lock.lock();
@@ -129,12 +129,12 @@ public class Env {
     }
 
     private static final class JobContext {
-        private final long jobId;
+        private final String jobId;
         private volatile SourceReader reader;
         private volatile Map<String, String> config;
         private volatile DataSource dataSource;
 
-        private JobContext(long jobId, DataSource dataSource, Map<String, 
String> config) {
+        private JobContext(String jobId, DataSource dataSource, Map<String, 
String> config) {
             this.jobId = jobId;
             this.dataSource = dataSource;
             this.config = config;
@@ -151,10 +151,10 @@ public class Env {
             if (this.dataSource != source) {
                 throw new IllegalStateException(
                         String.format(
-                                "Job %d already bound to datasource %s, cannot 
switch to %s",
+                                "Job %s already bound to datasource %s, cannot 
switch to %s",
                                 jobId, this.dataSource, source));
             }
-            Preconditions.checkState(reader != null, "Job %d reader not 
initialized yet", jobId);
+            Preconditions.checkState(reader != null, "Job %s reader not 
initialized yet", jobId);
             return reader;
         }
     }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java
index 8b4883b6203..4c719f0f9bb 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java
@@ -17,10 +17,14 @@
 
 package org.apache.doris.cdcclient.config;
 
+import org.apache.doris.cdcclient.exception.CommonException;
+import org.apache.doris.cdcclient.exception.StreamException;
 import org.apache.doris.cdcclient.model.rest.RestResponse;
 
 import jakarta.servlet.http.HttpServletRequest;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.ControllerAdvice;
 import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.ResponseBody;
@@ -35,4 +39,17 @@ public class GlobalExceptionHandler {
         log.error("Unexpected exception", e);
         return RestResponse.internalError(e.getMessage());
     }
+
+    @ExceptionHandler(StreamException.class)
+    public Object streamExceptionHandler(StreamException e) {
+        // Directly throwing an exception allows curl to detect anomalies in 
the streaming response.
+        log.error("Exception in streaming response, re-throwing to client", e);
+        throw e;
+    }
+
+    @ExceptionHandler(CommonException.class)
+    public Object commonExceptionHandler(CommonException e) {
+        log.error("Unexpected common exception", e);
+        return 
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index d0c2c17e457..0b677208404 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
+import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
 
 @RestController
 public class ClientController {
@@ -71,7 +72,7 @@ public class ClientController {
         }
     }
 
-    /** Fetch records from source reader */
+    /** Fetch records from source reader, for debug */
     @RequestMapping(path = "/api/fetchRecords", method = RequestMethod.POST)
     public Object fetchRecords(@RequestBody FetchRecordRequest recordReq) {
         try {
@@ -82,6 +83,12 @@ public class ClientController {
         }
     }
 
+    @RequestMapping(path = "/api/fetchRecordStream", method = 
RequestMethod.POST)
+    public StreamingResponseBody fetchRecordStream(@RequestBody 
FetchRecordRequest recordReq)
+            throws Exception {
+        return pipelineCoordinator.fetchRecordStream(recordReq);
+    }
+
     /** Fetch records from source reader and Write records to backend */
     @RequestMapping(path = "/api/writeRecords", method = RequestMethod.POST)
     public Object writeRecord(@RequestBody WriteRecordRequest recordReq) {
@@ -126,4 +133,9 @@ public class ClientController {
     public Object getFailReason(@PathVariable("taskId") String taskId) {
         return 
RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId));
     }
+
+    @RequestMapping(path = "/api/getTaskOffset/{taskId}", method = 
RequestMethod.POST)
+    public Object getTaskIdOffset(@PathVariable String taskId) {
+        return 
RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId));
+    }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java
similarity index 55%
copy from 
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
copy to 
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java
index c7b60026d88..b27afe56dfe 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java
@@ -15,20 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.exception;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+public class CommonException extends RuntimeException {
+    public CommonException() {
+        super();
+    }
 
-import java.util.Map;
+    public CommonException(String message) {
+        super(message);
+    }
 
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class JobBaseConfig {
-    private Long jobId;
-    private String dataSource;
-    private Map<String, String> config;
-    private String frontendAddress;
+    public CommonException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public CommonException(Throwable cause) {
+        super(cause);
+    }
+
+    protected CommonException(
+            String message,
+            Throwable cause,
+            boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
 }
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java
similarity index 55%
copy from 
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
copy to 
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java
index c7b60026d88..73f9b34fbcd 100644
--- 
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java
@@ -15,20 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.exception;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+public class StreamException extends RuntimeException {
+    public StreamException() {
+        super();
+    }
 
-import java.util.Map;
+    public StreamException(String message) {
+        super(message);
+    }
 
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class JobBaseConfig {
-    private Long jobId;
-    private String dataSource;
-    private Map<String, String> config;
-    private String frontendAddress;
+    public StreamException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public StreamException(Throwable cause) {
+        super(cause);
+    }
+
+    protected StreamException(
+            String message,
+            Throwable cause,
+            boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 414a1d23797..850c3038b91 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -19,6 +19,8 @@ package org.apache.doris.cdcclient.service;
 
 import org.apache.doris.cdcclient.common.Constants;
 import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.exception.CommonException;
+import org.apache.doris.cdcclient.exception.StreamException;
 import org.apache.doris.cdcclient.model.response.RecordWithMeta;
 import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
 import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
@@ -26,17 +28,22 @@ import 
org.apache.doris.cdcclient.source.reader.SourceReader;
 import org.apache.doris.cdcclient.source.reader.SplitReadResult;
 import org.apache.doris.cdcclient.utils.ConfigUtil;
 import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
 import org.apache.doris.job.cdc.request.FetchRecordRequest;
 import org.apache.doris.job.cdc.request.WriteRecordRequest;
 import org.apache.doris.job.cdc.split.BinlogSplit;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 
+import java.io.BufferedOutputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -51,12 +58,14 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.SCHEMA_HEARTBEAT_EVENT_KEY_NAME;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import io.debezium.data.Envelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
 
 /** Pipeline coordinator. */
 @Component
@@ -64,13 +73,16 @@ public class PipelineCoordinator {
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelineCoordinator.class);
     private static final String SPLIT_ID = "splitId";
     // jobId
-    private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new 
ConcurrentHashMap<>();
+    private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new 
ConcurrentHashMap<>();
+    // taskId, offset
+    private final Map<String, Map<String, String>> taskOffsetCache = new 
ConcurrentHashMap<>();
     // taskId -> writeFailReason
     private final Map<String, String> taskErrorMaps = new 
ConcurrentHashMap<>();
     private final ThreadPoolExecutor executor;
     private static final int MAX_CONCURRENT_TASKS = 10;
     private static final int QUEUE_CAPACITY = 128;
-    private static ObjectMapper objectMapper = new ObjectMapper();
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private final byte[] LINE_DELIMITER = 
"\n".getBytes(StandardCharsets.UTF_8);
 
     public PipelineCoordinator() {
         this.executor =
@@ -90,6 +102,165 @@ public class PipelineCoordinator {
                         new ThreadPoolExecutor.AbortPolicy());
     }
 
+    /** return data for http_file_reader */
+    public StreamingResponseBody fetchRecordStream(FetchRecordRequest 
fetchReq) throws Exception {
+        SourceReader sourceReader;
+        SplitReadResult readResult;
+        try {
+            if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) {
+                LOG.info(
+                        "Generate initial meta for fetch record request, 
jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId());
+                // means the request did not originate from the job, only tvf
+                Map<String, Object> meta = generateMeta(fetchReq.getConfig());
+                fetchReq.setMeta(meta);
+            }
+
+            sourceReader = Env.getCurrentEnv().getReader(fetchReq);
+            readResult = sourceReader.prepareAndSubmitSplit(fetchReq);
+        } catch (Exception ex) {
+            throw new CommonException(ex);
+        }
+
+        return outputStream -> {
+            try {
+                buildStreamRecords(sourceReader, fetchReq, readResult, 
outputStream);
+            } catch (Exception ex) {
+                LOG.error(
+                        "Failed fetch record, jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId(),
+                        ex);
+                throw new StreamException(ex);
+            }
+        };
+    }
+
+    private void buildStreamRecords(
+            SourceReader sourceReader,
+            FetchRecordRequest fetchRecord,
+            SplitReadResult readResult,
+            OutputStream rawOutputStream)
+            throws Exception {
+        SourceSplit split = readResult.getSplit();
+        Map<String, String> lastMeta = null;
+        int rowCount = 0;
+        BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+        try {
+            // Poll records using the existing mechanism
+            boolean shouldStop = false;
+            long startTime = System.currentTimeMillis();
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator = 
sourceReader.pollRecords();
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    if (elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS) {
+                        break;
+                    }
+                    continue;
+                }
+                while (recordIterator.hasNext()) {
+                    SourceRecord element = recordIterator.next();
+                    if (isHeartbeatEvent(element)) {
+                        shouldStop = true;
+                        break;
+                    }
+                    DeserializeResult result =
+                            sourceReader.deserialize(fetchRecord.getConfig(), 
element);
+                    if (!CollectionUtils.isEmpty(result.getRecords())) {
+                        for (String record : result.getRecords()) {
+                            bos.write(record.getBytes(StandardCharsets.UTF_8));
+                            bos.write(LINE_DELIMITER);
+                        }
+                        rowCount += result.getRecords().size();
+                    }
+                }
+            }
+            // force flush buffer
+            bos.flush();
+        } finally {
+            // Commit offset and cleanup
+            sourceReader.commitSourceOffset(fetchRecord.getJobId(), 
readResult.getSplit());
+            sourceReader.finishSplitRecords();
+        }
+
+        LOG.info(
+                "Fetch records completed, jobId={}, taskId={}, splitId={}, 
rowCount={}",
+                fetchRecord.getJobId(),
+                fetchRecord.getTaskId(),
+                split.splitId(),
+                rowCount);
+
+        if (readResult.getSplitState() != null) {
+            if (sourceReader.isSnapshotSplit(split)) {
+                lastMeta = 
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
+                lastMeta.put(SPLIT_ID, split.splitId());
+            } else if (sourceReader.isBinlogSplit(split)) {
+                lastMeta = 
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
+                lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+            } else {
+                throw new RuntimeException(
+                        "unknown split type: " + 
split.getClass().getSimpleName());
+            }
+        } else {
+            throw new RuntimeException("split state is null");
+        }
+
+        if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) {
+            taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta);
+        }
+
+        // Convention: standalone TVF uses a UUID jobId; job-driven TVF will 
use a numeric Long
+        // jobId (set via rewriteTvfParams). When the job-driven path is 
implemented,
+        // rewriteTvfParams must inject the job's Long jobId into the TVF 
properties
+        // so that generateParams() can read it, keeping isLong() correct.
+        // TODO: replace isLong() with an explicit field in FetchRecordRequest
+        // once the job-driven TVF path is fully implemented.
+        if (!isLong(fetchRecord.getJobId())) {
+            // TVF requires closing the window after each execution,
+            // while PG requires dropping the slot.
+            sourceReader.close(fetchRecord);
+            // Clean up the job context so it does not accumulate in 
Env.jobContexts.
+            // Each TVF call uses a fresh UUID job ID, so without this the map 
grows unboundedly.
+            Env.getCurrentEnv().close(fetchRecord.getJobId());
+        }
+    }
+
+    private boolean isLong(String s) {
+        if (s == null || s.isEmpty()) return false;
+        try {
+            Long.parseLong(s);
+            return true;
+        } catch (NumberFormatException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Generate split meta from request.offset. This only applies to TVF, so 
initial is not
+     * supported because initial requires a job to obtain split information.
+     */
+    private Map<String, Object> generateMeta(Map<String, String> cdcConfig)
+            throws JsonProcessingException {
+        Map<String, Object> meta = new HashMap<>();
+        String offset = cdcConfig.get(DataSourceConfigKeys.OFFSET);
+        if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)
+                || 
DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) {
+            meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+        } else if (ConfigUtil.isJson(offset)) {
+            Map<String, String> startOffset =
+                    objectMapper.readValue(offset, new TypeReference<>() {});
+            meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+            meta.put("startingOffset", startOffset);
+        } else {
+            throw new RuntimeException("Unsupported offset: " + offset);
+        }
+        return meta;
+    }
+
+    /** pull data from api for test */
     public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest) 
throws Exception {
         SourceReader sourceReader = 
Env.getCurrentEnv().getReader(fetchRecordRequest);
         SplitReadResult readResult = 
sourceReader.prepareAndSubmitSplit(fetchRecordRequest);
@@ -482,7 +653,7 @@ public class PipelineCoordinator {
         return batchStreamLoad;
     }
 
-    public void closeJobStreamLoad(Long jobId) {
+    public void closeJobStreamLoad(String jobId) {
         DorisBatchStreamLoad batchStreamLoad = 
batchStreamLoadMap.remove(jobId);
         if (batchStreamLoad != null) {
             LOG.info("Close DorisBatchStreamLoad for jobId={}", jobId);
@@ -509,7 +680,7 @@ public class PipelineCoordinator {
      * @param readResult the read result containing split information
      */
     private void cleanupReaderResources(
-            SourceReader sourceReader, Long jobId, SplitReadResult readResult) 
{
+            SourceReader sourceReader, String jobId, SplitReadResult 
readResult) {
         try {
             // The LSN in the commit is the current offset, which is the 
offset from the last
             // successful write.
@@ -579,4 +750,9 @@ public class PipelineCoordinator {
         }
         return commitOffsets;
     }
+
+    public Map<String, String> getOffsetWithTaskId(String taskId) {
+        Map<String, String> taskOffset = taskOffsetCache.remove(taskId);
+        return taskOffset == null ? new HashMap<>() : taskOffset;
+    }
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 72e84c4413c..4583b049b81 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -93,13 +93,13 @@ public class DorisBatchStreamLoad implements Serializable {
     private final Map<String, ReadWriteLock> bufferMapLock = new 
ConcurrentHashMap<>();
     @Setter @Getter private String currentTaskId;
     private String targetDb;
-    private long jobId;
+    private String jobId;
     @Setter private String token;
     // stream load headers
     @Setter private Map<String, String> loadProps = new HashMap<>();
     @Getter private LoadStatistic loadStatistic;
 
-    public DorisBatchStreamLoad(long jobId, String targetDb) {
+    public DorisBatchStreamLoad(String jobId, String targetDb) {
         this.hostPort = Env.getCurrentEnv().getBackendHostPort();
         this.loadStatistic = new LoadStatistic();
         this.flushQueue = new LinkedBlockingDeque<>(1);
@@ -329,9 +329,9 @@ public class DorisBatchStreamLoad implements Serializable {
     class LoadAsyncExecutor implements Runnable {
 
         private int flushQueueSize;
-        private long jobId;
+        private String jobId;
 
-        public LoadAsyncExecutor(int flushQueueSize, long jobId) {
+        public LoadAsyncExecutor(int flushQueueSize, String jobId) {
             this.flushQueueSize = flushQueueSize;
             this.jobId = jobId;
         }
@@ -511,7 +511,7 @@ public class DorisBatchStreamLoad implements Serializable {
             CommitOffsetRequest commitRequest =
                     CommitOffsetRequest.builder()
                             .offset(OBJECT_MAPPER.writeValueAsString(meta))
-                            .jobId(jobId)
+                            .jobId(Long.parseLong(jobId))
                             .taskId(Long.parseLong(taskId))
                             .scannedRows(scannedRows)
                             .filteredRows(loadStatistic.getFilteredRows())
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 77052577341..e7245334ffb 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -112,7 +112,7 @@ public abstract class JdbcIncrementalSourceReader extends 
AbstractCdcSourceReade
     }
 
     @Override
-    public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
+    public void initialize(String jobId, DataSource dataSource, Map<String, 
String> config) {
         this.serializer.init(config);
 
         // Initialize thread pool for parallel polling
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index fa4578d509b..95eeb052681 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -41,7 +41,7 @@ public interface SourceReader {
     String SPLIT_ID = "splitId";
 
     /** Initialization, called when the program starts */
-    void initialize(long jobId, DataSource dataSource, Map<String, String> 
config);
+    void initialize(String jobId, DataSource dataSource, Map<String, String> 
config);
 
     /** Divide the data to be read. For example: split mysql to chunks */
     List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest config);
@@ -97,5 +97,5 @@ public interface SourceReader {
      * Commits the given offset with the source database. Used by some source 
like Postgres to
      * indicate how far the source TX log can be discarded.
      */
-    default void commitSourceOffset(Long jobId, SourceSplit sourceSplit) {}
+    default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {}
 }
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 15787782da9..a4f3a9e2547 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -69,7 +69,6 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -135,7 +134,7 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
     }
 
     @Override
-    public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
+    public void initialize(String jobId, DataSource dataSource, Map<String, 
String> config) {
         this.serializer.init(config);
 
         // Initialize thread pool for parallel polling
@@ -779,12 +778,11 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
         // unnecessary processing overhead until DDL support is added.
         configFactory.includeSchemaChanges(false);
 
-        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
-        String[] includingTbls =
-                Arrays.stream(includingTables.split(","))
-                        .map(t -> databaseName + "." + t.trim())
-                        .toArray(String[]::new);
-        configFactory.tableList(includingTbls);
+        // Set table list
+        String[] tableList = ConfigUtil.getTableList(databaseName, cdcConfig);
+        com.google.common.base.Preconditions.checkArgument(
+                tableList.length >= 1, "include_tables or table is required");
+        configFactory.tableList(tableList);
 
         // setting startMode
         String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
@@ -811,7 +809,14 @@ public class MySqlSourceReader extends 
AbstractCdcSourceReader {
             }
             if (offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
                     && 
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)) {
-                BinlogOffset binlogOffset = new BinlogOffset(offsetMap);
+                BinlogOffset binlogOffset =
+                        BinlogOffset.builder()
+                                .setBinlogFilePosition(
+                                        
offsetMap.get(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY),
+                                        Long.parseLong(
+                                                offsetMap.get(
+                                                        
BinlogOffset.BINLOG_POSITION_OFFSET_KEY)))
+                                .build();
                 
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
             } else {
                 throw new RuntimeException("Incorrect offset " + startupMode);
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index d465a71c242..492e0650d4b 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -55,7 +55,6 @@ import org.apache.flink.table.types.DataType;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -91,7 +90,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
     }
 
     @Override
-    public void initialize(long jobId, DataSource dataSource, Map<String, 
String> config) {
+    public void initialize(String jobId, DataSource dataSource, Map<String, 
String> config) {
         PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId, 0);
         PostgresDialect dialect = new PostgresDialect(sourceConfig);
         synchronized (SLOT_CREATION_LOCK) {
@@ -159,7 +158,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
 
     /** Generate PostgreSQL source config from Map config */
     private PostgresSourceConfig generatePostgresConfig(
-            Map<String, String> cdcConfig, Long jobId, int subtaskId) {
+            Map<String, String> cdcConfig, String jobId, int subtaskId) {
         PostgresSourceConfigFactory configFactory = new 
PostgresSourceConfigFactory();
 
         // Parse JDBC URL to extract connection info
@@ -192,14 +191,9 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         configFactory.includeSchemaChanges(false);
 
         // Set table list
-        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
-        if (StringUtils.isNotEmpty(includingTables)) {
-            String[] includingTbls =
-                    Arrays.stream(includingTables.split(","))
-                            .map(t -> schema + "." + t.trim())
-                            .toArray(String[]::new);
-            configFactory.tableList(includingTbls);
-        }
+        String[] tableList = ConfigUtil.getTableList(schema, cdcConfig);
+        Preconditions.checkArgument(tableList.length >= 1, "include_tables or 
table is required");
+        configFactory.tableList(tableList);
 
         // Set startup options
         String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
@@ -257,7 +251,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
         return configFactory.create(subtaskId);
     }
 
-    private String getSlotName(Long jobId) {
+    private String getSlotName(String jobId) {
         return "doris_cdc_" + jobId;
     }
 
@@ -380,7 +374,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
      * @return the fresh {@link TableChanges.TableChange}
      */
     private TableChanges.TableChange refreshSingleTableSchema(
-            TableId tableId, Map<String, String> config, long jobId) {
+            TableId tableId, Map<String, String> config, String jobId) {
         PostgresSourceConfig sourceConfig = generatePostgresConfig(config, 
jobId, 0);
         PostgresDialect dialect = new PostgresDialect(sourceConfig);
         try (JdbcConnection jdbcConnection = 
dialect.openJdbcConnection(sourceConfig)) {
@@ -408,7 +402,7 @@ public class PostgresSourceReader extends 
JdbcIncrementalSourceReader {
      * `CommitFeOffset` fails, Data after the startOffset will not be cleared.
      */
     @Override
-    public void commitSourceOffset(Long jobId, SourceSplit sourceSplit) {
+    public void commitSourceOffset(String jobId, SourceSplit sourceSplit) {
         try {
             if (sourceSplit instanceof StreamSplit) {
                 Offset offsetToCommit = ((StreamSplit) 
sourceSplit).getStartingOffset();
diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 46d581f58e5..5aa46753a26 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -43,8 +43,11 @@ public class ConfigUtil {
     private static ObjectMapper objectMapper = new ObjectMapper();
     private static final Logger LOG = 
LoggerFactory.getLogger(ConfigUtil.class);
 
-    public static String getServerId(long jobId) {
-        return String.valueOf(Math.abs(String.valueOf(jobId).hashCode()));
+    public static String getServerId(String jobId) {
+        // Use bitwise AND with Integer.MAX_VALUE to strip the sign bit,
+        // which avoids the edge case where Math.abs(Integer.MIN_VALUE) 
returns MIN_VALUE
+        // (negative).
+        return String.valueOf(jobId.hashCode() & Integer.MAX_VALUE);
     }
 
     public static ZoneId getServerTimeZoneFromJdbcUrl(String jdbcUrl) {
@@ -107,6 +110,21 @@ public class ConfigUtil {
         return properties;
     }
 
+    public static String[] getTableList(String schema, Map<String, String> 
cdcConfig) {
+        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
+        String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
+        if (StringUtils.isNotEmpty(includingTables)) {
+            return Arrays.stream(includingTables.split(","))
+                    .map(t -> schema + "." + t.trim())
+                    .toArray(String[]::new);
+        } else if (StringUtils.isNotEmpty(table)) {
+            Preconditions.checkArgument(!table.contains(","), "table only 
supports one table");
+            return new String[] {schema + "." + table.trim()};
+        } else {
+            return new String[0];
+        }
+    }
+
     public static boolean is13Timestamp(String s) {
         return s != null && s.matches("\\d{13}");
     }
diff --git 
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
new file mode 100644
index 00000000000..66d2a76d7c2
--- /dev/null
+++ 
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link ConfigUtil}. */
+class ConfigUtilTest {
+
+    // ─── getServerId 
──────────────────────────────────────────────────────────
+
+    @Test
+    void serverIdIsNonNegative() {
+        // Any jobId hash should produce a non-negative result (bitwise AND 
strips sign bit).
+        String result = ConfigUtil.getServerId("12345");
+        assertTrue(Long.parseLong(result) >= 0, "serverId must be 
non-negative");
+    }
+
+    @Test
+    void serverIdHandlesMinHashCode() {
+        // Find a string whose hashCode() == Integer.MIN_VALUE to exercise the 
edge case
+        // where Math.abs(Integer.MIN_VALUE) would return a negative number.
+        // "polygenelubricants" is a well-known such string.
+        String result = ConfigUtil.getServerId("polygenelubricants");
+        assertTrue(Long.parseLong(result) >= 0, "serverId must be non-negative 
for MIN_VALUE hash");
+    }
+
+    // ─── getTableList 
─────────────────────────────────────────────────────────
+
+    @Test
+    void tableListFromIncludeTables() {
+        Map<String, String> config = new HashMap<>();
+        config.put(DataSourceConfigKeys.INCLUDE_TABLES, "t1, t2, t3");
+        String[] result = ConfigUtil.getTableList("public", config);
+        assertArrayEquals(new String[]{"public.t1", "public.t2", "public.t3"}, 
result);
+    }
+
+    @Test
+    void tableListFromSingleTable() {
+        Map<String, String> config = new HashMap<>();
+        config.put(DataSourceConfigKeys.TABLE, "orders");
+        String[] result = ConfigUtil.getTableList("myschema", config);
+        assertArrayEquals(new String[]{"myschema.orders"}, result);
+    }
+
+    @Test
+    void tableListRejectsCommaInTableName() {
+        Map<String, String> config = new HashMap<>();
+        config.put(DataSourceConfigKeys.TABLE, "t1,t2");
+        assertThrows(IllegalArgumentException.class,
+                () -> ConfigUtil.getTableList("public", config));
+    }
+
+    @Test
+    void tableListEmptyWhenNeitherSet() {
+        Map<String, String> config = new HashMap<>();
+        String[] result = ConfigUtil.getTableList("public", config);
+        assertEquals(0, result.length);
+    }
+}
diff --git 
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out
 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out
new file mode 100644
index 00000000000..02b13ffe0b2
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_tvf --
+C1     3
+C1     99
+D1     4
+D1     4
+
+-- !select_tvf_dml --
+C1     99
+D1     4
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
new file mode 100644
index 00000000000..0536a5b3bf5
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cdc_stream_tvf_mysql", 
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_cdc_stream_tvf"
+    def mysqlDb = "test_cdc_db"
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar";
+        def offset = ""
+        def dmlOffset = ""
+
+        // --- Validation error tests (no JDBC connection needed) ---
+
+        test {
+            sql """select * from cdc_stream("type" = "mysql")"""
+            exception "jdbc_url is required"
+        }
+
+        test {
+            sql """select * from cdc_stream("jdbc_url" = 
"jdbc:mysql://localhost:3306")"""
+            exception "type is required"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://localhost:3306")"""
+            exception "table is required"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://localhost:3306",
+                "table" = "t1")"""
+            exception "offset is required"
+        }
+
+        // --- Data setup ---
+
+        connect("root", "123456", 
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+            sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+            sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+            sql """CREATE TABLE ${mysqlDb}.${table1} (
+                  `name` varchar(200) NOT NULL,
+                  `age` int DEFAULT NULL,
+                  PRIMARY KEY (`name`)
+                ) ENGINE=InnoDB"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1', 
1);"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1', 
2);"""
+
+            def result = sql_return_maparray "show master status"
+            def file = result[0]["File"]
+            def position = result[0]["Position"]
+            offset = """{"file":"${file}","pos":"${position}"}"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1', 
3);"""
+            sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('D1', 
4);"""
+
+            // capture offset before UPDATE/DELETE events
+            def result2 = sql_return_maparray "show master status"
+            dmlOffset = 
"""{"file":"${result2[0]["File"]}","pos":"${result2[0]["Position"]}"}"""
+            sql """UPDATE ${mysqlDb}.${table1} SET age = 99 WHERE name = 
'C1';"""
+            sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'D1';"""
+        }
+
+        // --- INSERT-only: read C1 and D1 from binlog offset ---
+
+        log.info("offset: " + offset)
+        qt_select_tvf """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "table" = "${table1}",
+                "offset" = '${offset}'
+            ) order by name
+        """
+
+        // --- UPDATE and DELETE events: read from dmlOffset ---
+
+        log.info("dmlOffset: " + dmlOffset)
+        qt_select_tvf_dml """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "table" = "${table1}",
+                "offset" = '${dmlOffset}'
+            ) order by name
+        """
+
+        // --- offset=earliest: should return all rows (no exception) ---
+
+        def earliestResult = sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "table" = "${table1}",
+                "offset" = 'earliest'
+            ) limit 1
+        """
+        assertNotNull(earliestResult)
+
+        // --- offset=initial: unsupported ---
+
+        test {
+            sql """
+            select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "table" = "${table1}",
+                "offset" = 'initial')
+            """
+            exception "Unsupported offset: initial"
+        }
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "table" = "${table1}",
+                "offset" = 'notjson')"""
+            exception "Unsupported offset: notjson"
+        }
+
+        // --- Non-existent table ---
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "table" = "no_such_table",
+                "offset" = '${offset}')
+            """
+            exception "Table does not exist: no_such_table"
+        }
+
+        // --- Wrong credentials ---
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "wronguser",
+                "password" = "wrongpass",
+                "database" = "${mysqlDb}",
+                "table" = "${table1}",
+                "offset" = '${offset}')
+            """
+            exception "can not connect to jdbc"
+        }
+
+        // --- Unreachable JDBC URL (closed port) ---
+
+        test {
+            sql """select * from cdc_stream(
+                "type" = "mysql",
+                "jdbc_url" = "jdbc:mysql://10.0.0.1:19999",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "com.mysql.cj.jdbc.Driver",
+                "user" = "root",
+                "password" = "123456",
+                "database" = "${mysqlDb}",
+                "table" = "${table1}",
+                "offset" = '${offset}')
+            """
+            exception "can not connect to jdbc"
+        }
+    }
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
new file mode 100644
index 00000000000..f0cf2feb373
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cdc_stream_tvf_postgres", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def currentDb = (sql "select database()")[0][0]
+    def table1 = "user_info_pg_normal1_tvf"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port");
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // create test
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+                  "name" varchar(200),
+                  "age" int2,
+                  PRIMARY KEY ("name")
+                )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('A1', 1);"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age) 
VALUES ('B1', 2);"""
+        }
+
+        test {
+            sql """
+            select * from cdc_stream(
+                "type" = "postgres",
+                 "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                 "driver_url" = "${driver_url}",
+                 "driver_class" = "org.postgresql.Driver",
+                 "user" = "${pgUser}",
+                 "password" = "${pgPassword}",
+                 "database" = "${pgDB}",
+                 "schema" = "${pgSchema}",
+                "table" = "${table1}",
+                "offset" = 'initial')
+            """
+            exception "Unsupported offset: initial"
+        }
+
+        // Here, because PG consumption requires creating a slot first,
+        // we only verify whether the execution can be successful.
+        def result = sql """
+            select * from cdc_stream(
+                "type" = "postgres",
+                "jdbc_url" = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                "driver_url" = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user" = "${pgUser}",
+                "password" = "${pgPassword}",
+                "database" = "${pgDB}",
+                "schema" = "${pgSchema}",
+                "table" = "${table1}",
+                "offset" = 'latest')
+            """
+        log.info("result:", result)
+        assertNotNull(result)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to