github-actions[bot] commented on code in PR #60116:
URL: https://github.com/apache/doris/pull/60116#discussion_r2979973254
##########
be/src/io/fs/http_file_reader.cpp:
##########
@@ -96,32 +111,64 @@ Status HttpFileReader::open(const FileReaderOptions& opts)
{
return Status::OK();
}
- // Step 1: HEAD request to get file metadata
- RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
- _client->set_method(HttpMethod::HEAD);
- RETURN_IF_ERROR(_client->execute());
+ // start CDC client
+ auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
+ if (enable_cdc_iter != _extend_kv.end() && enable_cdc_iter->second ==
"true") {
+ LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
+ ExecEnv* env = ExecEnv::GetInstance();
+ if (env == nullptr || env->cdc_client_mgr() == nullptr) {
+ return Status::InternalError("ExecEnv or CdcClientMgr is not
initialized");
+ }
Review Comment:
**[Fragile `_file_size` when `_size_known=false`]**
When `_enable_chunk_response=true`, `_size_known` is set to `false` but
`_file_size` remains at its default value `static_cast<size_t>(-1)` (i.e.,
`SIZE_MAX` ~18 exabytes). While the current CDC TVF path avoids the dangerous
`_read_one_message()` allocation because `_read_json_by_line` defaults to
`true`, this is **fragile**.
If anyone later adds `strip_outer_array=true` or `read_json_by_line=false`
to the TVF, it would trigger an allocation of `SIZE_MAX` bytes in
`NewJsonReader::_read_one_message()`, causing OOM/crash.
Consider updating `_file_size` after the chunked response completes, e.g.,
after caching in the non-Range download callback:
```cpp
_full_file_cache = buf;
_full_file_cached = true;
_file_size = _full_file_cache.size();
_size_known = true;
```
##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunction {
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final String URI =
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";
+ private final Map<String, String> originProps;
+
+ public CdcStreamTableValuedFunction(Map<String, String> properties) throws
AnalysisException {
+ this.originProps = properties;
+ validate(properties);
+ processProps(properties);
+ }
+
+ private void processProps(Map<String, String> properties) throws
AnalysisException {
+ Map<String, String> copyProps = new HashMap<>(properties);
+ copyProps.put("format", "json");
+ super.parseCommonProperties(copyProps);
+ this.processedParams.put("enable_cdc_client", "true");
+ this.processedParams.put("uri", URI);
+ this.processedParams.put("http.enable.range.request", "false");
+ this.processedParams.put("http.enable.chunk.response", "true");
+ this.processedParams.put("http.method", "POST");
+
+ String payload = generateParams(properties);
+ this.processedParams.put("http.payload", payload);
+ this.backendConnectProperties.putAll(processedParams);
+ generateFileStatus();
+ }
+
+ private String generateParams(Map<String, String> properties) throws
AnalysisException {
+ FetchRecordRequest recordRequest = new FetchRecordRequest();
+ recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+ recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+ recordRequest.setConfig(properties);
+ try {
+ return objectMapper.writeValueAsString(recordRequest);
+ } catch (IOException e) {
+ LOG.info("Failed to serialize fetch record request," +
e.getMessage());
+ throw new AnalysisException(e.getMessage());
+ }
+ }
+
+ private void validate(Map<String, String> properties) {
+
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL),
"jdbc_url is required");
+
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE),
"type is required");
+
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE),
"table is required");
+
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.OFFSET),
"offset is required");
Review Comment:
**[`validate()` throws `IllegalArgumentException`, not `AnalysisException`]**
`Preconditions.checkArgument` throws `IllegalArgumentException`, not
`AnalysisException`. This is inconsistent with FE's exception model. The
exception will propagate as a generic runtime exception rather than a proper
analysis error with a user-friendly message.
Consider using `AnalysisException` directly:
```java
if (!properties.containsKey(DataSourceConfigKeys.JDBC_URL)) {
throw new AnalysisException("jdbc_url is required");
}
```
Alternatively, the `CdcStream.toCatalogFunction()` does catch `Throwable`
and wraps it in `AnalysisException`, but that results in a less clear error
message.
##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -448,8 +448,8 @@ public static List<Column> getColumns(JdbcClient jdbcClient,
* The remoteDB implementation differs for each data source;
* refer to the hierarchical mapping in the JDBC catalog.
*/
- private static String getRemoteDbName(DataSourceType sourceType,
Map<String, String> properties)
- throws JobException {
+ public static String getRemoteDbName(DataSourceType sourceType,
Map<String, String> properties)
Review Comment:
**[Exception type change from `JobException` to `RuntimeException`]**
`getRemoteDbName` previously threw `JobException` (a checked exception).
Changing it to `RuntimeException` is a behavioral change that could silently
break callers who previously caught `JobException` specifically. Existing
callers that catch `JobException` will no longer catch errors from this method.
If the change is intentional (to avoid checked exceptions for the TVF path),
consider documenting this or ensuring all callers are updated.
##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunction {
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final String URI =
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";
+ private final Map<String, String> originProps;
+
+ public CdcStreamTableValuedFunction(Map<String, String> properties) throws
AnalysisException {
+ this.originProps = properties;
+ validate(properties);
+ processProps(properties);
+ }
+
+ private void processProps(Map<String, String> properties) throws
AnalysisException {
+ Map<String, String> copyProps = new HashMap<>(properties);
+ copyProps.put("format", "json");
+ super.parseCommonProperties(copyProps);
+ this.processedParams.put("enable_cdc_client", "true");
+ this.processedParams.put("uri", URI);
+ this.processedParams.put("http.enable.range.request", "false");
+ this.processedParams.put("http.enable.chunk.response", "true");
+ this.processedParams.put("http.method", "POST");
+
+ String payload = generateParams(properties);
+ this.processedParams.put("http.payload", payload);
+ this.backendConnectProperties.putAll(processedParams);
+ generateFileStatus();
+ }
+
+ private String generateParams(Map<String, String> properties) throws
AnalysisException {
+ FetchRecordRequest recordRequest = new FetchRecordRequest();
+ recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+ recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+ recordRequest.setConfig(properties);
+ try {
+ return objectMapper.writeValueAsString(recordRequest);
+ } catch (IOException e) {
+ LOG.info("Failed to serialize fetch record request," +
e.getMessage());
+ throw new AnalysisException(e.getMessage());
+ }
+ }
+
+ private void validate(Map<String, String> properties) {
+
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL),
"jdbc_url is required");
+
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE),
"type is required");
+
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE),
"table is required");
+
Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.OFFSET),
"offset is required");
+ }
+
+ private void generateFileStatus() {
+ this.fileStatuses.clear();
+ this.fileStatuses.add(new TBrokerFileStatus(URI, false,
Integer.MAX_VALUE, false));
+ }
+
+ @Override
+ public List<Column> getTableColumns() throws AnalysisException {
+ DataSourceType dataSourceType =
+
DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase());
+ JdbcClient jdbcClient =
StreamingJobUtils.getJdbcClient(dataSourceType, processedParams);
Review Comment:
**[JdbcClient resource leak]**
`StreamingJobUtils.getJdbcClient()` creates a `JdbcClient` which holds JDBC
connections. This method never closes it. The `JdbcClient` should be closed in
a try-with-resources or finally block:
```java
JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(dataSourceType,
processedParams);
try {
// use jdbcClient
} finally {
jdbcClient.closeClient();
}
```
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -51,26 +58,31 @@
import static
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.SCHEMA_HEARTBEAT_EVENT_KEY_NAME;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.debezium.data.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+import
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
/** Pipeline coordinator. */
@Component
public class PipelineCoordinator {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineCoordinator.class);
private static final String SPLIT_ID = "splitId";
// jobId
- private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new
ConcurrentHashMap<>();
+ private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new
ConcurrentHashMap<>();
+ // taskId, offset
Review Comment:
**[`taskOffsetCache` potential memory leak]**
The `taskOffsetCache` map is populated in `buildStreamRecords()` but only
cleaned up when `getOffsetWithTaskId()` is called (which uses `remove()`). For
the current TVF usage, `taskId` is null (not set in the TVF path per
`generateParams()` in `CdcStreamTableValuedFunction`), so
`StringUtils.isNotEmpty(fetchRecord.getTaskId())` is false and no entry is
added — so this is safe for now.
However, if `taskId` is set (e.g., by a future caller), and
`getOffsetWithTaskId()` is never called, entries accumulate indefinitely.
Consider adding TTL-based eviction or a cleanup mechanism.
##########
be/src/io/fs/http_file_reader.cpp:
##########
@@ -224,9 +271,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" <<
req_len
<< " with_range=" << _range_supported;
- // Prepare and initialize the HTTP client for GET request
+ // Prepare and initialize the HTTP client for request
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
- _client->set_method(HttpMethod::GET);
+
+ // Determine HTTP method from configuration (default: GET)
+ HttpMethod method = HttpMethod::GET;
+ auto method_iter = _extend_kv.find("http.method");
+ if (method_iter != _extend_kv.end()) {
+ method = to_http_method(method_iter->second.c_str());
+ if (method == HttpMethod::UNKNOWN) {
+ LOG(WARNING) << "Invalid http.method value: " <<
method_iter->second
+ << ", falling back to GET";
+ method = HttpMethod::GET;
+ }
+ }
+ _client->set_method(method);
+
+ // Set payload if configured (supports POST, PUT, DELETE, etc.)
+ auto payload_iter = _extend_kv.find("http.payload");
+ if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) {
+ _client->set_payload(payload_iter->second);
+ _client->set_content_type("application/json");
+ VLOG(2) << "HTTP request with payload, size=" <<
payload_iter->second.size();
Review Comment:
**[VLOG may expose credentials]**
The `http.payload` contains the full `FetchRecordRequest` JSON, which
includes the `config` map with `user`, `password`, `jdbc_url`, `driver_url`,
etc. If someone changes this to log the actual payload content, credentials
would be exposed.
Consider either:
1. Not including credentials in the HTTP payload (pass them separately)
2. Adding a note/TODO that the payload contains sensitive data
3. Masking sensitive fields before serialization
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -43,8 +43,8 @@ public class ConfigUtil {
private static ObjectMapper objectMapper = new ObjectMapper();
private static final Logger LOG =
LoggerFactory.getLogger(ConfigUtil.class);
- public static String getServerId(long jobId) {
- return String.valueOf(Math.abs(String.valueOf(jobId).hashCode()));
+ public static String getServerId(String jobId) {
Review Comment:
**[`Math.abs(hashCode())` can return negative for `Integer.MIN_VALUE`]**
`String.hashCode()` can return `Integer.MIN_VALUE`, and
`Math.abs(Integer.MIN_VALUE)` returns `Integer.MIN_VALUE` (a negative number).
This could produce a negative server ID.
Use `Math.abs((long) jobId.hashCode())` or `(jobId.hashCode() & 0x7FFFFFFF)`
instead.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -511,7 +511,7 @@ public void commitOffset(
CommitOffsetRequest commitRequest =
CommitOffsetRequest.builder()
.offset(OBJECT_MAPPER.writeValueAsString(meta))
- .jobId(jobId)
+ .jobId(Long.parseLong(jobId))
.taskId(Long.parseLong(taskId))
Review Comment:
**[`Long.parseLong(jobId)` will fail for UUID-based job IDs]**
`commitOffset()` calls `Long.parseLong(jobId)`. For TVF calls, the jobId is
a UUID string (e.g., `UUID.randomUUID().toString().replace("-", "")`), so this
would throw `NumberFormatException`.
The TVF path likely doesn't reach `commitOffset` (only the streaming job
path does, and streaming jobs use numeric IDs), but the `isLong()` guard in
`PipelineCoordinator.buildStreamRecords()` is the only protection. This is
fragile — if the code path changes, it will break.
Consider adding a comment here noting this constraint, or restructuring so
that `commitOffset` doesn't need to parse the jobId.
##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends
ExternalFileTableValuedFunction {
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final String URI =
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";
+ private final Map<String, String> originProps;
+
+ public CdcStreamTableValuedFunction(Map<String, String> properties) throws
AnalysisException {
+ this.originProps = properties;
+ validate(properties);
+ processProps(properties);
+ }
+
+ private void processProps(Map<String, String> properties) throws
AnalysisException {
+ Map<String, String> copyProps = new HashMap<>(properties);
+ copyProps.put("format", "json");
+ super.parseCommonProperties(copyProps);
+ this.processedParams.put("enable_cdc_client", "true");
+ this.processedParams.put("uri", URI);
+ this.processedParams.put("http.enable.range.request", "false");
+ this.processedParams.put("http.enable.chunk.response", "true");
+ this.processedParams.put("http.method", "POST");
+
+ String payload = generateParams(properties);
+ this.processedParams.put("http.payload", payload);
+ this.backendConnectProperties.putAll(processedParams);
+ generateFileStatus();
+ }
+
+ private String generateParams(Map<String, String> properties) throws
AnalysisException {
+ FetchRecordRequest recordRequest = new FetchRecordRequest();
+ recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+ recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+ recordRequest.setConfig(properties);
+ try {
+ return objectMapper.writeValueAsString(recordRequest);
+ } catch (IOException e) {
+ LOG.info("Failed to serialize fetch record request," +
e.getMessage());
+ throw new AnalysisException(e.getMessage());
+ }
Review Comment:
**[`LOG.info` for serialization failure should be `LOG.warn` or
`LOG.error`]**
Failing to serialize the fetch record request is an error condition, not
informational. This should use `LOG.warn` or `LOG.error`.
##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_mysql.groovy:
##########
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cdc_stream_tvf_mysql",
"p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
+ def currentDb = (sql "select database()")[0][0]
+ def table1 = "user_info_cdc_stream_tvf"
+ def mysqlDb = "test_cdc_db"
+
+ String enabled = context.config.otherConfigs.get("enableJdbcTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String mysql_port = context.config.otherConfigs.get("mysql_57_port");
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String s3_endpoint = getS3Endpoint()
+ String bucket = getS3BucketName()
+ String driver_url =
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar"
+ def offset = ""
+
+ // create test
+ connect("root", "123456",
"jdbc:mysql://${externalEnvIp}:${mysql_port}") {
+ sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}"""
+ sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}"""
+ sql """CREATE TABLE ${mysqlDb}.${table1} (
+ `name` varchar(200) NOT NULL,
+ `age` int DEFAULT NULL,
+ PRIMARY KEY (`name`)
+ ) ENGINE=InnoDB"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('A1',
1);"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('B1',
2);"""
+
+ def result = sql_return_maparray "show master status"
+ def file = result[0]["File"]
+ def position = result[0]["Position"]
+ offset = """{"file":"${file}","pos":"${position}"}"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('C1',
3);"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES ('D1',
4);"""
+ }
+
+ log.info("offset: " + offset)
+ qt_select_tvf """select * from cdc_stream(
Review Comment:
**[Missing `order_qt_` prefix or explicit `ORDER BY`]**
Per testing standards, `qt_select_tvf` should use `order_qt_select_tvf`
prefix or the SQL should have an explicit `ORDER BY` clause to ensure
deterministic output. CDC data ordering may not be guaranteed across different
environments.
Change to:
```groovy
order_qt_select_tvf """select * from cdc_stream(...) order by name"""
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]