This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 7dd40bd31ca branch-4.1: [Feature](tvf) Support cdc stream tvf for
mysql and pg #60116 (#61840)
7dd40bd31ca is described below
commit 7dd40bd31caaf84b816572a5ae7ca05fb926afcc
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 30 10:51:41 2026 +0800
branch-4.1: [Feature](tvf) Support cdc stream tvf for mysql and pg #60116
(#61840)
Cherry-picked from #60116
Co-authored-by: wudi <[email protected]>
---
be/src/io/fs/http_file_reader.cpp | 140 +++++++++++--
be/src/io/fs/http_file_reader.h | 6 +
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 1 +
.../job/cdc/request/CompareOffsetRequest.java | 2 +-
.../doris/job/cdc/request/FetchRecordRequest.java | 1 +
.../job/cdc/request/FetchTableSplitsRequest.java | 2 +-
.../doris/job/cdc/request/JobBaseConfig.java | 2 +-
.../doris/catalog/BuiltinTableValuedFunctions.java | 4 +-
.../insert/streaming/StreamingMultiTblTask.java | 2 +-
.../job/offset/jdbc/JdbcSourceOffsetProvider.java | 6 +-
.../apache/doris/job/util/StreamingJobUtils.java | 7 +-
.../expressions/functions/table/CdcStream.java | 59 ++++++
.../visitor/TableValuedFunctionVisitor.java | 5 +
.../CdcStreamTableValuedFunction.java | 140 +++++++++++++
.../org/apache/doris/cdcclient/common/Env.java | 20 +-
.../cdcclient/config/GlobalExceptionHandler.java | 17 ++
.../cdcclient/controller/ClientController.java | 14 +-
.../doris/cdcclient/exception/CommonException.java | 36 ++--
.../doris/cdcclient/exception/StreamException.java | 36 ++--
.../cdcclient/service/PipelineCoordinator.java | 184 ++++++++++++++++-
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 10 +-
.../source/reader/JdbcIncrementalSourceReader.java | 2 +-
.../cdcclient/source/reader/SourceReader.java | 4 +-
.../source/reader/mysql/MySqlSourceReader.java | 23 ++-
.../reader/postgres/PostgresSourceReader.java | 22 +--
.../apache/doris/cdcclient/utils/ConfigUtil.java | 22 ++-
.../doris/cdcclient/utils/ConfigUtilTest.java | 85 ++++++++
.../cdc/tvf/test_cdc_stream_tvf_mysql.out | 11 ++
.../cdc/tvf/test_cdc_stream_tvf_mysql.groovy | 218 +++++++++++++++++++++
.../cdc/tvf/test_cdc_stream_tvf_postgres.groovy | 82 ++++++++
30 files changed, 1057 insertions(+), 106 deletions(-)
diff --git a/be/src/io/fs/http_file_reader.cpp
b/be/src/io/fs/http_file_reader.cpp
index da27f94c970..a2085c6fe6b 100644
--- a/be/src/io/fs/http_file_reader.cpp
+++ b/be/src/io/fs/http_file_reader.cpp
@@ -22,7 +22,11 @@
#include <algorithm>
+#include "common/config.h"
#include "common/logging.h"
+#include "gen_cpp/internal_service.pb.h"
+#include "runtime/cdc_client_mgr.h"
+#include "runtime/exec_env.h"
namespace doris::io {
@@ -84,6 +88,17 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo,
std::string url, in
}
}
+ // Parse chunk response configuration; chunk response implies no Range
support
+ auto chunk_iter = _extend_kv.find("http.enable.chunk.response");
+ if (chunk_iter != _extend_kv.end()) {
+ std::string value = chunk_iter->second;
+ std::transform(value.begin(), value.end(), value.begin(), ::tolower);
+ _enable_chunk_response = (value == "true" || value == "1");
+ if (_enable_chunk_response) {
+ _range_supported = false;
+ }
+ }
+
_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
}
@@ -91,37 +106,82 @@ HttpFileReader::~HttpFileReader() {
static_cast<void>(close());
}
+Status HttpFileReader::setup_cdc_client() {
+ auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
+ if (enable_cdc_iter == _extend_kv.end() || enable_cdc_iter->second !=
"true") {
+ return Status::OK();
+ }
+
+ LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
+ ExecEnv* env = ExecEnv::GetInstance();
+ if (env == nullptr || env->cdc_client_mgr() == nullptr) {
+ return Status::InternalError("ExecEnv or CdcClientMgr is not
initialized");
+ }
+
+ PRequestCdcClientResult result;
+ Status start_st = env->cdc_client_mgr()->start_cdc_client(&result);
+ if (!start_st.ok()) {
+ LOG(ERROR) << "Failed to start CDC client, status=" <<
start_st.to_string();
+ return start_st;
+ }
+
+ // Replace CDC_CLIENT_PORT placeholder with actual CDC client port
+ const std::string placeholder = "CDC_CLIENT_PORT";
+ size_t pos = _url.find(placeholder);
+ if (pos != std::string::npos) {
+ _url.replace(pos, placeholder.size(),
std::to_string(doris::config::cdc_client_port));
+ }
+ LOG(INFO) << "CDC client started successfully for " << _url;
+ return Status::OK();
+}
+
Status HttpFileReader::open(const FileReaderOptions& opts) {
+ // CDC client setup must run before the _initialized guard.
+ // See setup_cdc_client() for lifecycle details.
+ RETURN_IF_ERROR(setup_cdc_client());
+
+ // Skip metadata detection when file size was pre-supplied by the caller.
if (_initialized) {
return Status::OK();
}
- // Step 1: HEAD request to get file metadata
- RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
- _client->set_method(HttpMethod::HEAD);
- RETURN_IF_ERROR(_client->execute());
+ // Step 1: HEAD request to get file metadata (skip for chunk response)
+ if (_enable_chunk_response) {
+ // Chunk streaming response: size is unknown until the stream
completes.
+ // _range_supported is already false (set in constructor).
+ _size_known = false;
+ // Reset _file_size from the SIZE_MAX default to 0 so that any caller
of
+ // size() (e.g. NewJsonReader::_read_one_message) does not attempt to
+ // allocate SIZE_MAX bytes before the download completes.
+ _file_size = 0;
+ LOG(INFO) << "Chunk response mode enabled, skipping HEAD request for "
<< _url;
+ } else {
+ // Normal mode: execute HEAD request to get file metadata
+ RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
+ _client->set_method(HttpMethod::HEAD);
+ RETURN_IF_ERROR(_client->execute());
- uint64_t content_length = 0;
- RETURN_IF_ERROR(_client->get_content_length(&content_length));
+ uint64_t content_length = 0;
+ RETURN_IF_ERROR(_client->get_content_length(&content_length));
- _file_size = content_length;
- _size_known = true;
+ _file_size = content_length;
+ _size_known = true;
+ }
- // Step 2: Check if Range request is disabled by configuration
- if (!_enable_range_request) {
- // User explicitly disabled Range requests, use non-Range mode directly
+ // Step 2: Check if Range request is disabled by configuration.
+ // Chunk response mode always has _range_supported=false (set in
constructor), so only
+ // the non-chunk non-Range path needs the file size guard.
+ if (_enable_chunk_response) {
+ // Nothing to do: _range_supported already false, size check not
applicable
+ } else if (!_enable_range_request) {
_range_supported = false;
- LOG(INFO) << "Range requests disabled by configuration for " << _url
- << ", using non-Range mode. File size: " << _file_size << "
bytes";
-
- // Check if file size exceeds limit for non-Range mode
+ LOG(INFO) << "Range requests disabled by configuration for " << _url;
if (_file_size > _max_request_size_bytes) {
return Status::InternalError(
- "Non-Range mode: file size ({} bytes) exceeds maximum
allowed size ({} bytes, "
- "configured by http.max.request.size.bytes). URL: {}",
+ "Non-Range mode: file size ({} bytes) exceeds maximum
allowed size ({} "
+ "bytes, configured by http.max.request.size.bytes). URL:
{}",
_file_size, _max_request_size_bytes, _url);
}
-
LOG(INFO) << "Non-Range mode validated for " << _url << ", file size:
" << _file_size
<< " bytes, max allowed: " << _max_request_size_bytes << "
bytes";
} else {
@@ -224,9 +284,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" <<
req_len
<< " with_range=" << _range_supported;
- // Prepare and initialize the HTTP client for GET request
+ // Prepare and initialize the HTTP client for request
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
- _client->set_method(HttpMethod::GET);
+
+ // Determine HTTP method from configuration (default: GET)
+ HttpMethod method = HttpMethod::GET;
+ auto method_iter = _extend_kv.find("http.method");
+ if (method_iter != _extend_kv.end()) {
+ method = to_http_method(method_iter->second.c_str());
+ if (method == HttpMethod::UNKNOWN) {
+ LOG(WARNING) << "Invalid http.method value: " <<
method_iter->second
+ << ", falling back to GET";
+ method = HttpMethod::GET;
+ }
+ }
+ _client->set_method(method);
+
+ // Set payload if configured (supports POST, PUT, DELETE, etc.)
+ auto payload_iter = _extend_kv.find("http.payload");
+ if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) {
+ _client->set_payload(payload_iter->second);
+ _client->set_content_type("application/json");
+ VLOG(2) << "HTTP request with payload, size=" <<
payload_iter->second.size();
+ }
_client->set_header("Expect", "");
_client->set_header("Connection", "close");
@@ -270,6 +350,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
long http_status = _client->get_http_status();
VLOG(2) << "HTTP response: status=" << http_status << " received_bytes="
<< buf.size();
+ // Check for HTTP error status codes (4xx, 5xx)
+ if (http_status >= 400) {
+ std::string error_body;
+ if (buf.empty()) {
+ error_body = "(empty response body)";
+ } else {
+ // Limit error message to 1024 bytes to avoid excessive logging
+ size_t max_len = std::min(buf.size(), static_cast<size_t>(1024));
+ error_body = buf.substr(0, max_len);
+ }
+
+ return Status::InternalError("HTTP request failed with status {}:
{}.", http_status,
+ error_body);
+ }
+
if (buf.empty()) {
*bytes_read = buffer_offset;
return Status::OK();
@@ -295,6 +390,11 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
// Cache the complete file content for subsequent reads
_full_file_cache = std::move(buf);
_full_file_cached = true;
+ // Now that the full content is in hand, update _file_size to the
actual
+ // byte count. This replaces the 0 placeholder set in open() for chunk
+ // response mode, so subsequent calls to size() return a correct value.
+ _file_size = _full_file_cache.size();
+ _size_known = true;
VLOG(2) << "Cached full file: " << _full_file_cache.size() << " bytes";
diff --git a/be/src/io/fs/http_file_reader.h b/be/src/io/fs/http_file_reader.h
index 9b7f52e5270..91a360a000d 100644
--- a/be/src/io/fs/http_file_reader.h
+++ b/be/src/io/fs/http_file_reader.h
@@ -62,6 +62,10 @@ private:
// Returns OK on success with _range_supported set appropriately
Status detect_range_support();
+ // Start the CDC client process
+ // Called at the start of open() when enable_cdc_client=true.
+ Status setup_cdc_client();
+
std::unique_ptr<char[]> _read_buffer;
static constexpr size_t READ_BUFFER_SIZE = 1 << 20; // 1MB
// Default maximum file size for servers that don't support Range requests
@@ -89,6 +93,8 @@ private:
// Full file cache for non-Range mode to avoid repeated downloads
std::string _full_file_cache; // Cache complete file content
bool _full_file_cached = false; // Whether full file has been cached
+
+ bool _enable_chunk_response = false; // Whether server returns chunk
streaming response
};
} // namespace doris::io
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index 47ee5f21d27..9d2d5034eab 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -19,6 +19,7 @@ package org.apache.doris.job.cdc;
public class DataSourceConfigKeys {
public static final String JDBC_URL = "jdbc_url";
+ public static final String TYPE = "type";
public static final String DRIVER_URL = "driver_url";
public static final String DRIVER_CLASS = "driver_class";
public static final String USER = "user";
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
index 1a57cbdefe3..814605b7762 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java
@@ -38,7 +38,7 @@ public class CompareOffsetRequest extends JobBaseConfig {
String frontendAddress,
Map<String, String> offsetFirst,
Map<String, String> offsetSecond) {
- super(jobId, sourceType, sourceProperties, frontendAddress);
+ super(jobId.toString(), sourceType, sourceProperties, frontendAddress);
this.offsetFirst = offsetFirst;
this.offsetSecond = offsetSecond;
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
index 7ed28d618fd..65ffd36966c 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
@@ -23,4 +23,5 @@ import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public class FetchRecordRequest extends JobBaseRecordRequest {
+ private String taskId;
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
index 5c3cf62ab4c..a54b2630734 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java
@@ -35,7 +35,7 @@ public class FetchTableSplitsRequest extends JobBaseConfig {
public FetchTableSplitsRequest(Long jobId, String name,
Map<String, String> sourceProperties, String frontendAddress,
String snapshotTable) {
- super(jobId, name, sourceProperties, frontendAddress);
+ super(jobId.toString(), name, sourceProperties, frontendAddress);
this.snapshotTable = snapshotTable;
}
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
index c7b60026d88..49c95d98ff5 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
@@ -27,7 +27,7 @@ import java.util.Map;
@AllArgsConstructor
@NoArgsConstructor
public class JobBaseConfig {
- private Long jobId;
+ private String jobId;
private String dataSource;
private Map<String, String> config;
private String frontendAddress;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
index bafd2eeb918..a55ebedce9f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
+import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream;
import org.apache.doris.nereids.trees.expressions.functions.table.File;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
import
org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks;
@@ -75,7 +76,8 @@ public class BuiltinTableValuedFunctions implements
FunctionHelper {
tableValued(ParquetMeta.class, "parquet_meta"),
tableValued(ParquetFileMetadata.class, "parquet_file_metadata"),
tableValued(ParquetKvMetadata.class, "parquet_kv_metadata"),
- tableValued(ParquetBloomProbe.class, "parquet_bloom_probe")
+ tableValued(ParquetBloomProbe.class, "parquet_bloom_probe"),
+ tableValued(CdcStream.class, "cdc_stream")
);
public static final BuiltinTableValuedFunctions INSTANCE = new
BuiltinTableValuedFunctions();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 32526f9c513..2c9fbf6fe17 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -181,7 +181,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
private WriteRecordRequest buildRequestParams() throws JobException {
JdbcOffset offset = (JdbcOffset) runningOffset;
WriteRecordRequest request = new WriteRecordRequest();
- request.setJobId(getJobId());
+ request.setJobId(String.valueOf(getJobId()));
request.setConfig(sourceProperties);
request.setDataSource(dataSourceType.name());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
index b77dd8d8bd6..964659e79f4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java
@@ -198,7 +198,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
public void fetchRemoteMeta(Map<String, String> properties) throws
Exception {
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams =
- new JobBaseConfig(getJobId(), sourceType.name(),
sourceProperties, getFrontendAddress());
+ new JobBaseConfig(getJobId().toString(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchEndOffset")
.setParams(new Gson().toJson(requestParams)).build();
@@ -570,7 +570,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
private void initSourceReader() throws JobException {
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams =
- new JobBaseConfig(getJobId(), sourceType.name(),
sourceProperties, getFrontendAddress());
+ new JobBaseConfig(getJobId().toString(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/initReader")
.setParams(new Gson().toJson(requestParams)).build();
@@ -618,7 +618,7 @@ public class JdbcSourceOffsetProvider implements
SourceOffsetProvider {
StreamingJobUtils.deleteJobMeta(jobId);
Backend backend = StreamingJobUtils.selectBackend();
JobBaseConfig requestParams =
- new JobBaseConfig(getJobId(), sourceType.name(),
sourceProperties, getFrontendAddress());
+ new JobBaseConfig(getJobId().toString(), sourceType.name(),
sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/close")
.setParams(new Gson().toJson(requestParams)).build();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 9eec0061219..46a47036ccd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -215,7 +215,7 @@ public class StreamingJobUtils {
return ctx;
}
- private static JdbcClient getJdbcClient(DataSourceType sourceType,
Map<String, String> properties) {
+ public static JdbcClient getJdbcClient(DataSourceType sourceType,
Map<String, String> properties) {
JdbcClientConfig config = new JdbcClientConfig();
config.setCatalog(sourceType.name());
config.setUser(properties.get(DataSourceConfigKeys.USER));
@@ -448,8 +448,7 @@ public class StreamingJobUtils {
* The remoteDB implementation differs for each data source;
* refer to the hierarchical mapping in the JDBC catalog.
*/
- private static String getRemoteDbName(DataSourceType sourceType,
Map<String, String> properties)
- throws JobException {
+ public static String getRemoteDbName(DataSourceType sourceType,
Map<String, String> properties) {
String remoteDb = null;
switch (sourceType) {
case MYSQL:
@@ -461,7 +460,7 @@ public class StreamingJobUtils {
Preconditions.checkArgument(StringUtils.isNotEmpty(remoteDb),
"schema is required");
break;
default:
- throw new JobException("Unsupported source type " +
sourceType);
+ throw new RuntimeException("Unsupported source type " +
sourceType);
}
return remoteDb;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java
new file mode 100644
index 00000000000..3a6cdaf61da
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/**
+ * CdcStream TVF.
+ */
+public class CdcStream extends TableValuedFunction {
+
+ public CdcStream(Properties tvfProperties) {
+ super("cdc_stream", tvfProperties);
+ }
+
+ @Override
+ public FunctionSignature customSignature() {
+ return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX,
getArgumentsTypes());
+ }
+
+ @Override
+ protected TableValuedFunctionIf toCatalogFunction() {
+ try {
+ Map<String, String> arguments = getTVFProperties().getMap();
+ return new CdcStreamTableValuedFunction(arguments);
+ } catch (Throwable t) {
+ throw new AnalysisException("Can not build
CdcStreamTableValuedFunction by "
+ + this + ": " + t.getMessage(), t);
+ }
+ }
+
+ @Override
+ public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+ return visitor.visitCdcStream(this, context);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
index 91fe9c5a168..31b3162e647 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.expressions.visitor;
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
+import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream;
import org.apache.doris.nereids.trees.expressions.functions.table.File;
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
import
org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks;
@@ -80,6 +81,10 @@ public interface TableValuedFunctionVisitor<R, C> {
return visitTableValuedFunction(http, context);
}
+ default R visitCdcStream(CdcStream cdcStream, C context) {
+ return visitTableValuedFunction(cdcStream, context);
+ }
+
default R visitFrontendsDisks(FrontendsDisks frontendsDisks, C context) {
return visitTableValuedFunction(frontendsDisks, context);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
new file mode 100644
index 00000000000..621d2fbe6b6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunction {
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final String URI =
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";
+
+ public CdcStreamTableValuedFunction(Map<String, String> properties) throws
AnalysisException {
+ validate(properties);
+ processProps(properties);
+ }
+
+ private void processProps(Map<String, String> properties) throws
AnalysisException {
+ Map<String, String> copyProps = new HashMap<>(properties);
+ copyProps.put("format", "json");
+ super.parseCommonProperties(copyProps);
+ this.processedParams.put("enable_cdc_client", "true");
+ this.processedParams.put("uri", URI);
+ this.processedParams.put("http.enable.range.request", "false");
+ this.processedParams.put("http.enable.chunk.response", "true");
+ this.processedParams.put("http.method", "POST");
+
+ String payload = generateParams(properties);
+ this.processedParams.put("http.payload", payload);
+ this.backendConnectProperties.putAll(processedParams);
+ generateFileStatus();
+ }
+
+ private String generateParams(Map<String, String> properties) throws
AnalysisException {
+ FetchRecordRequest recordRequest = new FetchRecordRequest();
+ recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+ recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+ recordRequest.setConfig(properties);
+ try {
+ return objectMapper.writeValueAsString(recordRequest);
+ } catch (IOException e) {
+ LOG.warn("Failed to serialize fetch record request", e);
+ throw new AnalysisException("Failed to serialize fetch record
request: " + e.getMessage(), e);
+ }
+ }
+
+ private void validate(Map<String, String> properties) throws
AnalysisException {
+ if (!properties.containsKey(DataSourceConfigKeys.JDBC_URL)) {
+ throw new AnalysisException("jdbc_url is required");
+ }
+ if (!properties.containsKey(DataSourceConfigKeys.TYPE)) {
+ throw new AnalysisException("type is required");
+ }
+ if (!properties.containsKey(DataSourceConfigKeys.TABLE)) {
+ throw new AnalysisException("table is required");
+ }
+ if (!properties.containsKey(DataSourceConfigKeys.OFFSET)) {
+ throw new AnalysisException("offset is required");
+ }
+ }
+
+ private void generateFileStatus() {
+ this.fileStatuses.clear();
+ this.fileStatuses.add(new TBrokerFileStatus(URI, false,
Integer.MAX_VALUE, false));
+ }
+
+ @Override
+ public List<Column> getTableColumns() throws AnalysisException {
+ DataSourceType dataSourceType =
+
DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase());
+ JdbcClient jdbcClient =
StreamingJobUtils.getJdbcClient(dataSourceType, processedParams);
+ try {
+ String database =
StreamingJobUtils.getRemoteDbName(dataSourceType, processedParams);
+ String table = processedParams.get(DataSourceConfigKeys.TABLE);
+ if (!jdbcClient.isTableExist(database, table)) {
+ throw new AnalysisException("Table does not exist: " + table);
+ }
+ return jdbcClient.getColumnsFromJdbc(database, table);
+ } finally {
+ jdbcClient.closeClient();
+ }
+ }
+
+ @Override
+ public TFileType getTFileType() {
+ return TFileType.FILE_HTTP;
+ }
+
+ @Override
+ public String getFilePath() {
+ return URI;
+ }
+
+ @Override
+ public BrokerDesc getBrokerDesc() {
+ return new BrokerDesc("CdcStreamTvfBroker", StorageType.HTTP,
processedParams);
+ }
+
+ @Override
+ public String getTableName() {
+ return "CdcStreamTableValuedFunction";
+ }
+
+ @Override
+ public List<String> getPathPartitionKeys() {
+ return new ArrayList<>();
+ }
+}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
index 332a4766002..79d5c65cf57 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java
@@ -38,8 +38,8 @@ import org.slf4j.LoggerFactory;
public class Env {
private static final Logger LOG = LoggerFactory.getLogger(Env.class);
private static volatile Env INSTANCE;
- private final Map<Long, JobContext> jobContexts;
- private final Map<Long, Lock> jobLocks;
+ private final Map<String, JobContext> jobContexts;
+ private final Map<String, Lock> jobLocks;
@Setter private int backendHttpPort;
@Setter @Getter private String clusterToken;
@Setter @Getter private volatile String feMasterAddress;
@@ -85,9 +85,9 @@ public class Env {
}
private SourceReader getOrCreateReader(
- Long jobId, DataSource dataSource, Map<String, String> config) {
- Objects.requireNonNull(jobId, "jobId");
- Objects.requireNonNull(dataSource, "dataSource");
+ String jobId, DataSource dataSource, Map<String, String> config) {
+ Objects.requireNonNull(jobId, "jobId is null");
+ Objects.requireNonNull(dataSource, "dataSource is null");
JobContext context = jobContexts.get(jobId);
if (context != null) {
return context.getReader(dataSource);
@@ -112,7 +112,7 @@ public class Env {
}
}
- public void close(Long jobId) {
+ public void close(String jobId) {
Lock lock = jobLocks.get(jobId);
if (lock != null) {
lock.lock();
@@ -129,12 +129,12 @@ public class Env {
}
private static final class JobContext {
- private final long jobId;
+ private final String jobId;
private volatile SourceReader reader;
private volatile Map<String, String> config;
private volatile DataSource dataSource;
- private JobContext(long jobId, DataSource dataSource, Map<String,
String> config) {
+ private JobContext(String jobId, DataSource dataSource, Map<String,
String> config) {
this.jobId = jobId;
this.dataSource = dataSource;
this.config = config;
@@ -151,10 +151,10 @@ public class Env {
if (this.dataSource != source) {
throw new IllegalStateException(
String.format(
- "Job %d already bound to datasource %s, cannot
switch to %s",
+ "Job %s already bound to datasource %s, cannot
switch to %s",
jobId, this.dataSource, source));
}
- Preconditions.checkState(reader != null, "Job %d reader not
initialized yet", jobId);
+ Preconditions.checkState(reader != null, "Job %s reader not
initialized yet", jobId);
return reader;
}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java
index 8b4883b6203..4c719f0f9bb 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/config/GlobalExceptionHandler.java
@@ -17,10 +17,14 @@
package org.apache.doris.cdcclient.config;
+import org.apache.doris.cdcclient.exception.CommonException;
+import org.apache.doris.cdcclient.exception.StreamException;
import org.apache.doris.cdcclient.model.rest.RestResponse;
import jakarta.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
@@ -35,4 +39,17 @@ public class GlobalExceptionHandler {
log.error("Unexpected exception", e);
return RestResponse.internalError(e.getMessage());
}
+
+ @ExceptionHandler(StreamException.class)
+ public Object streamExceptionHandler(StreamException e) {
+ // Directly throwing an exception allows curl to detect anomalies in
the streaming response.
+ log.error("Exception in streaming response, re-throwing to client", e);
+ throw e;
+ }
+
+ @ExceptionHandler(CommonException.class)
+ public Object commonExceptionHandler(CommonException e) {
+ log.error("Unexpected common exception", e);
+ return
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(e.getMessage());
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
index d0c2c17e457..0b677208404 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
@@ -39,6 +39,7 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
@RestController
public class ClientController {
@@ -71,7 +72,7 @@ public class ClientController {
}
}
- /** Fetch records from source reader */
+ /** Fetch records from source reader, for debug */
@RequestMapping(path = "/api/fetchRecords", method = RequestMethod.POST)
public Object fetchRecords(@RequestBody FetchRecordRequest recordReq) {
try {
@@ -82,6 +83,12 @@ public class ClientController {
}
}
+ @RequestMapping(path = "/api/fetchRecordStream", method =
RequestMethod.POST)
+ public StreamingResponseBody fetchRecordStream(@RequestBody
FetchRecordRequest recordReq)
+ throws Exception {
+ return pipelineCoordinator.fetchRecordStream(recordReq);
+ }
+
/** Fetch records from source reader and Write records to backend */
@RequestMapping(path = "/api/writeRecords", method = RequestMethod.POST)
public Object writeRecord(@RequestBody WriteRecordRequest recordReq) {
@@ -126,4 +133,9 @@ public class ClientController {
public Object getFailReason(@PathVariable("taskId") String taskId) {
return
RestResponse.success(pipelineCoordinator.getTaskFailReason(taskId));
}
+
+ @RequestMapping(path = "/api/getTaskOffset/{taskId}", method =
RequestMethod.POST)
+ public Object getTaskIdOffset(@PathVariable String taskId) {
+ return
RestResponse.success(pipelineCoordinator.getOffsetWithTaskId(taskId));
+ }
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java
similarity index 55%
copy from
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
copy to
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java
index c7b60026d88..b27afe56dfe 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/CommonException.java
@@ -15,20 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.exception;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+public class CommonException extends RuntimeException {
+ public CommonException() {
+ super();
+ }
-import java.util.Map;
+ public CommonException(String message) {
+ super(message);
+ }
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class JobBaseConfig {
- private Long jobId;
- private String dataSource;
- private Map<String, String> config;
- private String frontendAddress;
+ public CommonException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public CommonException(Throwable cause) {
+ super(cause);
+ }
+
+ protected CommonException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java
similarity index 55%
copy from
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
copy to
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java
index c7b60026d88..73f9b34fbcd 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/exception/StreamException.java
@@ -15,20 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.exception;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+public class StreamException extends RuntimeException {
+ public StreamException() {
+ super();
+ }
-import java.util.Map;
+ public StreamException(String message) {
+ super(message);
+ }
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class JobBaseConfig {
- private Long jobId;
- private String dataSource;
- private Map<String, String> config;
- private String frontendAddress;
+ public StreamException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StreamException(Throwable cause) {
+ super(cause);
+ }
+
+ protected StreamException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 414a1d23797..850c3038b91 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -19,6 +19,8 @@ package org.apache.doris.cdcclient.service;
import org.apache.doris.cdcclient.common.Constants;
import org.apache.doris.cdcclient.common.Env;
+import org.apache.doris.cdcclient.exception.CommonException;
+import org.apache.doris.cdcclient.exception.StreamException;
import org.apache.doris.cdcclient.model.response.RecordWithMeta;
import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad;
import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
@@ -26,17 +28,22 @@ import
org.apache.doris.cdcclient.source.reader.SourceReader;
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
import org.apache.doris.cdcclient.utils.ConfigUtil;
import org.apache.doris.cdcclient.utils.SchemaChangeManager;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
+import java.io.BufferedOutputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -51,12 +58,14 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.SCHEMA_HEARTBEAT_EVENT_KEY_NAME;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.debezium.data.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
/** Pipeline coordinator. */
@Component
@@ -64,13 +73,16 @@ public class PipelineCoordinator {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineCoordinator.class);
private static final String SPLIT_ID = "splitId";
// jobId
- private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new
ConcurrentHashMap<>();
+ private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new
ConcurrentHashMap<>();
+ // taskId, offset
+ private final Map<String, Map<String, String>> taskOffsetCache = new
ConcurrentHashMap<>();
// taskId -> writeFailReason
private final Map<String, String> taskErrorMaps = new
ConcurrentHashMap<>();
private final ThreadPoolExecutor executor;
private static final int MAX_CONCURRENT_TASKS = 10;
private static final int QUEUE_CAPACITY = 128;
- private static ObjectMapper objectMapper = new ObjectMapper();
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private final byte[] LINE_DELIMITER =
"\n".getBytes(StandardCharsets.UTF_8);
public PipelineCoordinator() {
this.executor =
@@ -90,6 +102,165 @@ public class PipelineCoordinator {
new ThreadPoolExecutor.AbortPolicy());
}
+ /** return data for http_file_reader */
+ public StreamingResponseBody fetchRecordStream(FetchRecordRequest
fetchReq) throws Exception {
+ SourceReader sourceReader;
+ SplitReadResult readResult;
+ try {
+ if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) {
+ LOG.info(
+ "Generate initial meta for fetch record request,
jobId={}, taskId={}",
+ fetchReq.getJobId(),
+ fetchReq.getTaskId());
+ // means the request did not originate from the job, only tvf
+ Map<String, Object> meta = generateMeta(fetchReq.getConfig());
+ fetchReq.setMeta(meta);
+ }
+
+ sourceReader = Env.getCurrentEnv().getReader(fetchReq);
+ readResult = sourceReader.prepareAndSubmitSplit(fetchReq);
+ } catch (Exception ex) {
+ throw new CommonException(ex);
+ }
+
+ return outputStream -> {
+ try {
+ buildStreamRecords(sourceReader, fetchReq, readResult,
outputStream);
+ } catch (Exception ex) {
+ LOG.error(
+ "Failed fetch record, jobId={}, taskId={}",
+ fetchReq.getJobId(),
+ fetchReq.getTaskId(),
+ ex);
+ throw new StreamException(ex);
+ }
+ };
+ }
+
+ private void buildStreamRecords(
+ SourceReader sourceReader,
+ FetchRecordRequest fetchRecord,
+ SplitReadResult readResult,
+ OutputStream rawOutputStream)
+ throws Exception {
+ SourceSplit split = readResult.getSplit();
+ Map<String, String> lastMeta = null;
+ int rowCount = 0;
+ BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+ try {
+ // Poll records using the existing mechanism
+ boolean shouldStop = false;
+ long startTime = System.currentTimeMillis();
+ while (!shouldStop) {
+ Iterator<SourceRecord> recordIterator =
sourceReader.pollRecords();
+ if (!recordIterator.hasNext()) {
+ Thread.sleep(100);
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS) {
+ break;
+ }
+ continue;
+ }
+ while (recordIterator.hasNext()) {
+ SourceRecord element = recordIterator.next();
+ if (isHeartbeatEvent(element)) {
+ shouldStop = true;
+ break;
+ }
+ DeserializeResult result =
+ sourceReader.deserialize(fetchRecord.getConfig(),
element);
+ if (!CollectionUtils.isEmpty(result.getRecords())) {
+ for (String record : result.getRecords()) {
+ bos.write(record.getBytes(StandardCharsets.UTF_8));
+ bos.write(LINE_DELIMITER);
+ }
+ rowCount += result.getRecords().size();
+ }
+ }
+ }
+ // force flush buffer
+ bos.flush();
+ } finally {
+ // Commit offset and cleanup
+ sourceReader.commitSourceOffset(fetchRecord.getJobId(),
readResult.getSplit());
+ sourceReader.finishSplitRecords();
+ }
+
+ LOG.info(
+ "Fetch records completed, jobId={}, taskId={}, splitId={},
rowCount={}",
+ fetchRecord.getJobId(),
+ fetchRecord.getTaskId(),
+ split.splitId(),
+ rowCount);
+
+ if (readResult.getSplitState() != null) {
+ if (sourceReader.isSnapshotSplit(split)) {
+ lastMeta =
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
+ lastMeta.put(SPLIT_ID, split.splitId());
+ } else if (sourceReader.isBinlogSplit(split)) {
+ lastMeta =
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
+ lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+ } else {
+ throw new RuntimeException(
+ "unknown split type: " +
split.getClass().getSimpleName());
+ }
+ } else {
+ throw new RuntimeException("split state is null");
+ }
+
+ if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) {
+ taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta);
+ }
+
+ // Convention: standalone TVF uses a UUID jobId; job-driven TVF will
use a numeric Long
+ // jobId (set via rewriteTvfParams). When the job-driven path is
implemented,
+ // rewriteTvfParams must inject the job's Long jobId into the TVF
properties
+ // so that generateParams() can read it, keeping isLong() correct.
+ // TODO: replace isLong() with an explicit field in FetchRecordRequest
+ // once the job-driven TVF path is fully implemented.
+ if (!isLong(fetchRecord.getJobId())) {
+ // TVF requires closing the window after each execution,
+ // while PG requires dropping the slot.
+ sourceReader.close(fetchRecord);
+ // Clean up the job context so it does not accumulate in
Env.jobContexts.
+ // Each TVF call uses a fresh UUID job ID, so without this the map
grows unboundedly.
+ Env.getCurrentEnv().close(fetchRecord.getJobId());
+ }
+ }
+
+ private boolean isLong(String s) {
+ if (s == null || s.isEmpty()) return false;
+ try {
+ Long.parseLong(s);
+ return true;
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Generate split meta from request.offset. This only applies to TVF, so
initial is not
+ * supported because initial requires a job to obtain split information.
+ */
+ private Map<String, Object> generateMeta(Map<String, String> cdcConfig)
+ throws JsonProcessingException {
+ Map<String, Object> meta = new HashMap<>();
+ String offset = cdcConfig.get(DataSourceConfigKeys.OFFSET);
+ if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)
+ ||
DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) {
+ meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+ } else if (ConfigUtil.isJson(offset)) {
+ Map<String, String> startOffset =
+ objectMapper.readValue(offset, new TypeReference<>() {});
+ meta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+ meta.put("startingOffset", startOffset);
+ } else {
+ throw new RuntimeException("Unsupported offset: " + offset);
+ }
+ return meta;
+ }
+
+ /** pull data from api for test */
public RecordWithMeta fetchRecords(FetchRecordRequest fetchRecordRequest)
throws Exception {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(fetchRecordRequest);
SplitReadResult readResult =
sourceReader.prepareAndSubmitSplit(fetchRecordRequest);
@@ -482,7 +653,7 @@ public class PipelineCoordinator {
return batchStreamLoad;
}
- public void closeJobStreamLoad(Long jobId) {
+ public void closeJobStreamLoad(String jobId) {
DorisBatchStreamLoad batchStreamLoad =
batchStreamLoadMap.remove(jobId);
if (batchStreamLoad != null) {
LOG.info("Close DorisBatchStreamLoad for jobId={}", jobId);
@@ -509,7 +680,7 @@ public class PipelineCoordinator {
* @param readResult the read result containing split information
*/
private void cleanupReaderResources(
- SourceReader sourceReader, Long jobId, SplitReadResult readResult)
{
+ SourceReader sourceReader, String jobId, SplitReadResult
readResult) {
try {
// The LSN in the commit is the current offset, which is the
offset from the last
// successful write.
@@ -579,4 +750,9 @@ public class PipelineCoordinator {
}
return commitOffsets;
}
+
+ public Map<String, String> getOffsetWithTaskId(String taskId) {
+ Map<String, String> taskOffset = taskOffsetCache.remove(taskId);
+ return taskOffset == null ? new HashMap<>() : taskOffset;
+ }
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 72e84c4413c..4583b049b81 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -93,13 +93,13 @@ public class DorisBatchStreamLoad implements Serializable {
private final Map<String, ReadWriteLock> bufferMapLock = new
ConcurrentHashMap<>();
@Setter @Getter private String currentTaskId;
private String targetDb;
- private long jobId;
+ private String jobId;
@Setter private String token;
// stream load headers
@Setter private Map<String, String> loadProps = new HashMap<>();
@Getter private LoadStatistic loadStatistic;
- public DorisBatchStreamLoad(long jobId, String targetDb) {
+ public DorisBatchStreamLoad(String jobId, String targetDb) {
this.hostPort = Env.getCurrentEnv().getBackendHostPort();
this.loadStatistic = new LoadStatistic();
this.flushQueue = new LinkedBlockingDeque<>(1);
@@ -329,9 +329,9 @@ public class DorisBatchStreamLoad implements Serializable {
class LoadAsyncExecutor implements Runnable {
private int flushQueueSize;
- private long jobId;
+ private String jobId;
- public LoadAsyncExecutor(int flushQueueSize, long jobId) {
+ public LoadAsyncExecutor(int flushQueueSize, String jobId) {
this.flushQueueSize = flushQueueSize;
this.jobId = jobId;
}
@@ -511,7 +511,7 @@ public class DorisBatchStreamLoad implements Serializable {
CommitOffsetRequest commitRequest =
CommitOffsetRequest.builder()
.offset(OBJECT_MAPPER.writeValueAsString(meta))
- .jobId(jobId)
+ .jobId(Long.parseLong(jobId))
.taskId(Long.parseLong(taskId))
.scannedRows(scannedRows)
.filteredRows(loadStatistic.getFilteredRows())
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
index 77052577341..e7245334ffb 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java
@@ -112,7 +112,7 @@ public abstract class JdbcIncrementalSourceReader extends
AbstractCdcSourceReade
}
@Override
- public void initialize(long jobId, DataSource dataSource, Map<String,
String> config) {
+ public void initialize(String jobId, DataSource dataSource, Map<String,
String> config) {
this.serializer.init(config);
// Initialize thread pool for parallel polling
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
index fa4578d509b..95eeb052681 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java
@@ -41,7 +41,7 @@ public interface SourceReader {
String SPLIT_ID = "splitId";
/** Initialization, called when the program starts */
- void initialize(long jobId, DataSource dataSource, Map<String, String>
config);
+ void initialize(String jobId, DataSource dataSource, Map<String, String>
config);
/** Divide the data to be read. For example: split mysql to chunks */
List<AbstractSourceSplit> getSourceSplits(FetchTableSplitsRequest config);
@@ -97,5 +97,5 @@ public interface SourceReader {
* Commits the given offset with the source database. Used by some source
like Postgres to
* indicate how far the source TX log can be discarded.
*/
- default void commitSourceOffset(Long jobId, SourceSplit sourceSplit) {}
+ default void commitSourceOffset(String jobId, SourceSplit sourceSplit) {}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
index 15787782da9..a4f3a9e2547 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
@@ -69,7 +69,6 @@ import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -135,7 +134,7 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
}
@Override
- public void initialize(long jobId, DataSource dataSource, Map<String,
String> config) {
+ public void initialize(String jobId, DataSource dataSource, Map<String,
String> config) {
this.serializer.init(config);
// Initialize thread pool for parallel polling
@@ -779,12 +778,11 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
// unnecessary processing overhead until DDL support is added.
configFactory.includeSchemaChanges(false);
- String includingTables =
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
- String[] includingTbls =
- Arrays.stream(includingTables.split(","))
- .map(t -> databaseName + "." + t.trim())
- .toArray(String[]::new);
- configFactory.tableList(includingTbls);
+ // Set table list
+ String[] tableList = ConfigUtil.getTableList(databaseName, cdcConfig);
+ com.google.common.base.Preconditions.checkArgument(
+ tableList.length >= 1, "include_tables or table is required");
+ configFactory.tableList(tableList);
// setting startMode
String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
@@ -811,7 +809,14 @@ public class MySqlSourceReader extends
AbstractCdcSourceReader {
}
if (offsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY)
&&
offsetMap.containsKey(BinlogOffset.BINLOG_POSITION_OFFSET_KEY)) {
- BinlogOffset binlogOffset = new BinlogOffset(offsetMap);
+ BinlogOffset binlogOffset =
+ BinlogOffset.builder()
+ .setBinlogFilePosition(
+
offsetMap.get(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY),
+ Long.parseLong(
+ offsetMap.get(
+
BinlogOffset.BINLOG_POSITION_OFFSET_KEY)))
+ .build();
configFactory.startupOptions(StartupOptions.specificOffset(binlogOffset));
} else {
throw new RuntimeException("Incorrect offset " + startupMode);
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
index d465a71c242..492e0650d4b 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
@@ -55,7 +55,6 @@ import org.apache.flink.table.types.DataType;
import java.time.Duration;
import java.time.Instant;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -91,7 +90,7 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
}
@Override
- public void initialize(long jobId, DataSource dataSource, Map<String,
String> config) {
+ public void initialize(String jobId, DataSource dataSource, Map<String,
String> config) {
PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId, 0);
PostgresDialect dialect = new PostgresDialect(sourceConfig);
synchronized (SLOT_CREATION_LOCK) {
@@ -159,7 +158,7 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
/** Generate PostgreSQL source config from Map config */
private PostgresSourceConfig generatePostgresConfig(
- Map<String, String> cdcConfig, Long jobId, int subtaskId) {
+ Map<String, String> cdcConfig, String jobId, int subtaskId) {
PostgresSourceConfigFactory configFactory = new
PostgresSourceConfigFactory();
// Parse JDBC URL to extract connection info
@@ -192,14 +191,9 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
configFactory.includeSchemaChanges(false);
// Set table list
- String includingTables =
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
- if (StringUtils.isNotEmpty(includingTables)) {
- String[] includingTbls =
- Arrays.stream(includingTables.split(","))
- .map(t -> schema + "." + t.trim())
- .toArray(String[]::new);
- configFactory.tableList(includingTbls);
- }
+ String[] tableList = ConfigUtil.getTableList(schema, cdcConfig);
+ Preconditions.checkArgument(tableList.length >= 1, "include_tables or
table is required");
+ configFactory.tableList(tableList);
// Set startup options
String startupMode = cdcConfig.get(DataSourceConfigKeys.OFFSET);
@@ -257,7 +251,7 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
return configFactory.create(subtaskId);
}
- private String getSlotName(Long jobId) {
+ private String getSlotName(String jobId) {
return "doris_cdc_" + jobId;
}
@@ -380,7 +374,7 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
* @return the fresh {@link TableChanges.TableChange}
*/
private TableChanges.TableChange refreshSingleTableSchema(
- TableId tableId, Map<String, String> config, long jobId) {
+ TableId tableId, Map<String, String> config, String jobId) {
PostgresSourceConfig sourceConfig = generatePostgresConfig(config,
jobId, 0);
PostgresDialect dialect = new PostgresDialect(sourceConfig);
try (JdbcConnection jdbcConnection =
dialect.openJdbcConnection(sourceConfig)) {
@@ -408,7 +402,7 @@ public class PostgresSourceReader extends
JdbcIncrementalSourceReader {
* `CommitFeOffset` fails, Data after the startOffset will not be cleared.
*/
@Override
- public void commitSourceOffset(Long jobId, SourceSplit sourceSplit) {
+ public void commitSourceOffset(String jobId, SourceSplit sourceSplit) {
try {
if (sourceSplit instanceof StreamSplit) {
Offset offsetToCommit = ((StreamSplit)
sourceSplit).getStartingOffset();
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
index 46d581f58e5..5aa46753a26 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
@@ -43,8 +43,11 @@ public class ConfigUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
private static final Logger LOG =
LoggerFactory.getLogger(ConfigUtil.class);
- public static String getServerId(long jobId) {
- return String.valueOf(Math.abs(String.valueOf(jobId).hashCode()));
+ public static String getServerId(String jobId) {
+ // Use bitwise AND with Integer.MAX_VALUE to strip the sign bit,
+ // which avoids the edge case where Math.abs(Integer.MIN_VALUE)
returns MIN_VALUE
+ // (negative).
+ return String.valueOf(jobId.hashCode() & Integer.MAX_VALUE);
}
public static ZoneId getServerTimeZoneFromJdbcUrl(String jdbcUrl) {
@@ -107,6 +110,21 @@ public class ConfigUtil {
return properties;
}
+ public static String[] getTableList(String schema, Map<String, String>
cdcConfig) {
+ String includingTables =
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
+ String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
+ if (StringUtils.isNotEmpty(includingTables)) {
+ return Arrays.stream(includingTables.split(","))
+ .map(t -> schema + "." + t.trim())
+ .toArray(String[]::new);
+ } else if (StringUtils.isNotEmpty(table)) {
+ Preconditions.checkArgument(!table.contains(","), "table only
supports one table");
+ return new String[] {schema + "." + table.trim()};
+ } else {
+ return new String[0];
+ }
+ }
+
public static boolean is13Timestamp(String s) {
return s != null && s.matches("\\d{13}");
}
diff --git
a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
new file mode 100644
index 00000000000..66d2a76d7c2
--- /dev/null
+++
b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/ConfigUtilTest.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cdcclient.utils;
+
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Unit tests for {@link ConfigUtil}. */
+class ConfigUtilTest {
+
+ // ─── getServerId
──────────────────────────────────────────────────────────
+
+ @Test
+ void serverIdIsNonNegative() {
+ // Any jobId hash should produce a non-negative result (bitwise AND
strips sign bit).
+ String result = ConfigUtil.getServerId("12345");
+ assertTrue(Long.parseLong(result) >= 0, "serverId must be
non-negative");
+ }
+
+ @Test
+ void serverIdHandlesMinHashCode() {
+ // Find a string whose hashCode() == Integer.MIN_VALUE to exercise the
edge case
+ // where Math.abs(Integer.MIN_VALUE) would return a negative number.
+ // "polygenelubricants" is a well-known such string.
+ String result = ConfigUtil.getServerId("polygenelubricants");
+ assertTrue(Long.parseLong(result) >= 0, "serverId must be non-negative
for MIN_VALUE hash");
+ }
+
+ // ─── getTableList
─────────────────────────────────────────────────────────
+
+ @Test
+ void tableListFromIncludeTables() {
+ Map<String, String> config = new HashMap<>();
+ config.put(DataSourceConfigKeys.INCLUDE_TABLES, "t1, t2, t3");
+ String[] result = ConfigUtil.getTableList("public", config);
+ assertArrayEquals(new String[]{"public.t1", "public.t2", "public.t3"},
result);
+ }
+
+ @Test
+ void tableListFromSingleTable() {
+ Map<String, String> config = new HashMap<>();
+ config.put(DataSourceConfigKeys.TABLE, "orders");
+ String[] result = ConfigUtil.getTableList("myschema", config);
+ assertArrayEquals(new String[]{"myschema.orders"}, result);
+ }
+
+ @Test
+ void tableListRejectsCommaInTableName() {
+ Map<String, String> config = new HashMap<>();
+ config.put(DataSourceConfigKeys.TABLE, "t1,t2");
+ assertThrows(IllegalArgumentException.class,
+ () -> ConfigUtil.getTableList("public", config));
+ }
+
+ @Test
+ void tableListEmptyWhenNeitherSet() {
+ Map<String, String> config = new HashMap<>();
+ String[] result = ConfigUtil.getTableList("public", config);
+ assertEquals(0, result.length);
+ }
+}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out
new file mode 100644
index 00000000000..02b13ffe0b2
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.out
@@ -0,0 +1,11 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_tvf --
+C1 3
+C1 99
+D1 4
+D1 4
+
+-- !select_tvf_dml --
+C1 99
+D1 4
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
new file mode 100644
index 00000000000..0536a5b3bf5
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy
@@ -0,0 +1,218 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cdc_stream_tvf_mysql",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_cdc_stream_tvf"
+ def mysqlDb = "test_cdc_db"
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+ def offset = ""
+ def dmlOffset = ""
+
+ // --- Validation error tests (no JDBC connection needed) ---
+
+ test {
+ sql """select * from cdc_stream("type" = "mysql")"""
+ exception "jdbc_url is required"
+ }
+
+ test {
+ sql """select * from cdc_stream("jdbc_url" =
"jdbc:mysql://localhost:3306")"""
+ exception "type is required"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://localhost:3306")"""
+ exception "table is required"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://localhost:3306",
+ "table" = "t1")"""
+ exception "offset is required"
+ }
+
+ // --- Data setup ---
+
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1',
1);"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1',
2);"""
+
+ def result = sql_return_maparray "show master status"
+ def file = result[0]["File"]
+ def position = result[0]["Position"]
+ offset = """{"file":"${file}","pos":"${position}"}"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1',
3);"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('D1',
4);"""
+
+ // capture offset before UPDATE/DELETE events
+ def result2 = sql_return_maparray "show master status"
+ dmlOffset =
"""{"file":"${result2[0]["File"]}","pos":"${result2[0]["Position"]}"}"""
+ sql """UPDATE ${mysqlDb}.${table1} SET age = 99 WHERE name =
'C1';"""
+ sql """DELETE FROM ${mysqlDb}.${table1} WHERE name = 'D1';"""
+ }
+
+ // --- INSERT-only: read C1 and D1 from binlog offset ---
+
+ log.info("offset: " + offset)
+ qt_select_tvf """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "table" = "${table1}",
+ "offset" = '${offset}'
+ ) order by name
+ """
+
+ // --- UPDATE and DELETE events: read from dmlOffset ---
+
+ log.info("dmlOffset: " + dmlOffset)
+ qt_select_tvf_dml """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "table" = "${table1}",
+ "offset" = '${dmlOffset}'
+ ) order by name
+ """
+
+ // --- offset=earliest: should return all rows (no exception) ---
+
+ def earliestResult = sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "table" = "${table1}",
+ "offset" = 'earliest'
+ ) limit 1
+ """
+ assertNotNull(earliestResult)
+
+ // --- offset=initial: unsupported ---
+
+ test {
+ sql """
+ select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "table" = "${table1}",
+ "offset" = 'initial')
+ """
+ exception "Unsupported offset: initial"
+ }
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "table" = "${table1}",
+ "offset" = 'notjson')"""
+ exception "Unsupported offset: notjson"
+ }
+
+ // --- Non-existent table ---
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "table" = "no_such_table",
+ "offset" = '${offset}')
+ """
+ exception "Table does not exist: no_such_table"
+ }
+
+ // --- Wrong credentials ---
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "wronguser",
+ "password" = "wrongpass",
+ "database" = "${mysqlDb}",
+ "table" = "${table1}",
+ "offset" = '${offset}')
+ """
+ exception "can not connect to jdbc"
+ }
+
+ // --- Unreachable JDBC URL (closed port) ---
+
+ test {
+ sql """select * from cdc_stream(
+ "type" = "mysql",
+ "jdbc_url" = "jdbc:mysql://10.0.0.1:19999",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "com.mysql.cj.jdbc.Driver",
+ "user" = "root",
+ "password" = "123456",
+ "database" = "${mysqlDb}",
+ "table" = "${table1}",
+ "offset" = '${offset}')
+ """
+ exception "can not connect to jdbc"
+ }
+ }
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
new file mode 100644
index 00000000000..f0cf2feb373
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cdc_stream_tvf_postgres",
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_pg_normal1_tvf"
+ def pgDB = "postgres"
+ def pgSchema = "cdc_test"
+ def pgUser = "postgres"
+ def pgPassword = "123456"
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String pg_port = context.config.otherConfigs.get("pg_14_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
+
+ // create test
+ connect("${pgUser}", "${pgPassword}",
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+ // sql """CREATE SCHEMA IF NOT EXISTS ${pgSchema}"""
+ sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}"""
+ sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} (
+ "name" varchar(200),
+ "age" int2,
+ PRIMARY KEY ("name")
+ )"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('A1', 1);"""
+ sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} (name, age)
VALUES ('B1', 2);"""
+ }
+
+ test {
+ sql """
+ select * from cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${table1}",
+ "offset" = 'initial')
+ """
+ exception "Unsupported offset: initial"
+ }
+
+ // Here, because PG consumption requires creating a slot first,
+ // we only verify whether the execution can be successful.
+ def result = sql """
+ select * from cdc_stream(
+ "type" = "postgres",
+ "jdbc_url" =
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+ "driver_url" = "${driver_url}",
+ "driver_class" = "org.postgresql.Driver",
+ "user" = "${pgUser}",
+ "password" = "${pgPassword}",
+ "database" = "${pgDB}",
+ "schema" = "${pgSchema}",
+ "table" = "${table1}",
+ "offset" = 'latest')
+ """
+ log.info("result:", result)
+ assertNotNull(result)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]