Copilot commented on code in PR #61826:
URL: https://github.com/apache/doris/pull/61826#discussion_r3007799689


##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for PostgreSQL.
+ *
+ * Scenario:
+ *   1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ *   2. Binlog phase: INSERT (C1, D1), UPDATE (C1), DELETE (D1) are applied.
+ */
+suite("test_streaming_job_cdc_stream_postgres", 
"p0,external,pg,external_docker,external_docker_pg,nondatalake") {
+    def jobName = "test_streaming_job_cdc_stream_postgres_name"
+    def currentDb = (sql "select database()")[0][0]
+    def dorisTable = "test_streaming_job_cdc_stream_postgres_tbl"
+    def pgDB = "postgres"
+    def pgSchema = "cdc_test"
+    def pgUser = "postgres"
+    def pgPassword = "123456"
+    def pgTable = "test_streaming_job_cdc_stream_postgres_src"
+
+    sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+    sql """drop table if exists ${currentDb}.${dorisTable} force"""
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} (
+            `name` varchar(200) NULL,
+            `age`  int NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`name`)
+        DISTRIBUTED BY HASH(`name`) BUCKETS AUTO
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1")
+    """
+
+    String enabled = context.config.otherConfigs.get("enableJdbcTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String pg_port = context.config.otherConfigs.get("pg_14_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String s3_endpoint = getS3Endpoint()
+        String bucket = getS3BucketName()
+        String driver_url = 
"https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar";
+
+        // prepare source table with pre-existing snapshot data
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${pgTable}"""
+            sql """CREATE TABLE ${pgDB}.${pgSchema}.${pgTable} (
+                      "name" varchar(200) PRIMARY KEY,
+                      "age"  int2
+                  )"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('A1', 1)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('B1', 2)"""
+        }
+
+        // create streaming job via cdc_stream TVF (offset=initial → snapshot 
then binlog)
+        sql """
+            CREATE JOB ${jobName}
+            ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (name, age)
+            SELECT name, age FROM cdc_stream(
+                "type"         = "postgres",
+                "jdbc_url"     = 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}",
+                "driver_url"   = "${driver_url}",
+                "driver_class" = "org.postgresql.Driver",
+                "user"         = "${pgUser}",
+                "password"     = "${pgPassword}",
+                "database"     = "${pgDB}",
+                "schema"             = "${pgSchema}",
+                "table"              = "${pgTable}",
+                "offset"             = "initial"
+            )
+        """
+
+        // wait for at least one snapshot task to succeed
+        try {
+            Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+                def cnt = sql """select SucceedTaskCount from 
jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'"""
+                log.info("SucceedTaskCount: " + cnt)
+                cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        // verify snapshot data
+        qt_snapshot_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        // insert incremental rows in PostgreSQL
+        connect("${pgUser}", "${pgPassword}", 
"jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") {
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('C1', 3)"""
+            sql """INSERT INTO ${pgDB}.${pgSchema}.${pgTable} (name, age) 
VALUES ('D1', 4)"""
+        }
+
+        // wait for binlog tasks to pick up the new rows
+        try {
+            Awaitility.await().atMost(120, SECONDS).pollInterval(2, 
SECONDS).until({
+                def rows = sql """SELECT count(1) FROM 
${currentDb}.${dorisTable} WHERE name IN ('C1', 'D1')"""
+                log.info("incremental rows: " + rows)
+                (rows.get(0).get(0) as int) == 2
+            })
+        } catch (Exception ex) {
+            log.info("job: " + (sql """select * from jobs("type"="insert") 
where Name='${jobName}'"""))
+            log.info("tasks: " + (sql """select * from tasks("type"="insert") 
where JobName='${jobName}'"""))
+            throw ex
+        }
+
+        qt_final_data """ SELECT * FROM ${currentDb}.${dorisTable} ORDER BY 
name """
+
+        // sql """DROP JOB IF EXISTS where jobname = '${jobName}'"""
+        // sql """drop table if exists ${currentDb}.${dorisTable} force"""
+    }

Review Comment:
   The cleanup statements (DROP JOB / drop table) are currently commented out. 
Leaving a streaming job running after the suite finishes can leak resources 
(e.g., CDC slot, backend tasks) and interfere with subsequent tests. Please 
re-enable cleanup (ideally in a `finally` block) so the job/table are always 
dropped even on failure.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -144,28 +147,53 @@ private void buildStreamRecords(
             OutputStream rawOutputStream)
             throws Exception {
         SourceSplit split = readResult.getSplit();
-        Map<String, String> lastMeta = null;
+        boolean isSnapshotSplit = sourceReader.isSnapshotSplit(split);
         int rowCount = 0;
+        int heartbeatCount = 0;
         BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+        boolean hasReceivedData = false;
+        boolean lastMessageIsHeartbeat = false;
+        long startTime = System.currentTimeMillis();
         try {
-            // Poll records using the existing mechanism
             boolean shouldStop = false;
-            long startTime = System.currentTimeMillis();
+            LOG.info(
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    fetchRecord.getJobId(),
+                    fetchRecord.getTaskId(),
+                    isSnapshotSplit);
             while (!shouldStop) {
                 Iterator<SourceRecord> recordIterator = 
sourceReader.pollRecords();
                 if (!recordIterator.hasNext()) {
                     Thread.sleep(100);
                     long elapsedTime = System.currentTimeMillis() - startTime;
-                    if (elapsedTime > Constants.POLL_SPLIT_RECORDS_TIMEOUTS) {
+                    boolean timeoutReached = elapsedTime > 
Constants.POLL_SPLIT_RECORDS_TIMEOUTS;
+                    if (shouldStop(
+                            isSnapshotSplit,
+                            hasReceivedData,
+                            lastMessageIsHeartbeat,
+                            elapsedTime,
+                            Constants.POLL_SPLIT_RECORDS_TIMEOUTS,
+                            timeoutReached)) {

Review Comment:
   In the `!recordIterator.hasNext()` branch, snapshot splits now delegate to 
`shouldStop(...)`. `shouldStop()` currently returns `true` unconditionally for 
snapshot splits, but snapshot reads are submitted asynchronously and 
`pollRecords()` can return empty before any records are produced. This can make 
`fetchRecordStream` exit immediately and miss snapshot data. Update the 
snapshot stopping condition so it only stops when snapshot reading is actually 
complete (or after receiving data / reaching timeout), rather than on the first 
empty poll.



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -1016,14 +1031,24 @@ public void beforeCommitted(TransactionState txnState) 
throws TransactionExcepti
             }
             LoadJob loadJob = loadJobs.get(0);
             LoadStatistic loadStatistic = loadJob.getLoadStatistic();
+
+            String offsetJson = offsetProvider.getCommitOffsetJson(
+                    runningStreamTask.getRunningOffset(),
+                    runningStreamTask.getTaskId(),
+                    runningStreamTask.getScanBackendIds());
+
+
+            if (StringUtils.isBlank(offsetJson)) {
+                throw new TransactionException("Can not found offset for 
attachment, load job id is " + runningStreamTask.getTaskId());

Review Comment:
   The new TransactionException message has grammar issues and is misleading: 
it says "Can not found offset" and reports "load job id" but concatenates the 
streaming taskId. Consider rewording to something like "Cannot find commit 
offset for txn attachment (taskId=...)" to make debugging easier and avoid 
confusion about which ID is being printed.



##########
fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java:
##########
@@ -559,6 +559,10 @@ public int numScanBackends() {
         return scanBackendIds.size();
     }
 
+    public Set<Long> getScanBackendIds() {
+        return scanBackendIds;
+    }

Review Comment:
   `getScanBackendIds()` returns the internal mutable `scanBackendIds` set 
directly, allowing external callers to accidentally mutate ScanNode state. 
Return an unmodifiable view or a defensive copy instead (e.g., 
`Collections.unmodifiableSet(...)` or `Sets.newHashSet(...)`).



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -51,24 +62,34 @@ private void processProps(Map<String, String> properties) 
throws AnalysisExcepti
         Map<String, String> copyProps = new HashMap<>(properties);
         copyProps.put("format", "json");
         super.parseCommonProperties(copyProps);
-        this.processedParams.put("enable_cdc_client", "true");
-        this.processedParams.put("uri", URI);
-        this.processedParams.put("http.enable.range.request", "false");
-        this.processedParams.put("http.enable.chunk.response", "true");
-        this.processedParams.put("http.method", "POST");
+        this.processedParams.put(ENABLE_CDC_CLIENT_KEY, "true");
+        this.processedParams.put(URI_KEY, URI);
+        this.processedParams.put(HTTP_ENABLE_RANGE_REQUEST_KEY, "false");
+        this.processedParams.put(HTTP_ENABLE_CHUNK_RESPONSE_KEY, "true");
+        this.processedParams.put(HTTP_METHOD_KEY, "POST");
 
         String payload = generateParams(properties);
-        this.processedParams.put("http.payload", payload);
+        this.processedParams.put(HTTP_PAYLOAD_KEY, payload);
         this.backendConnectProperties.putAll(processedParams);
         generateFileStatus();
     }
 
     private String generateParams(Map<String, String> properties) throws 
AnalysisException {
         FetchRecordRequest recordRequest = new FetchRecordRequest();
-        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        String defaultJobId = UUID.randomUUID().toString().replace("-", "");
+        recordRequest.setJobId(properties.getOrDefault(JOB_ID_KEY, 
defaultJobId));
         recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
         recordRequest.setConfig(properties);
         try {
+            // for tvf with job
+            if (properties.containsKey(TASK_ID_KEY)) {
+                recordRequest.setTaskId(properties.remove(TASK_ID_KEY));
+                String meta = properties.remove(META_KEY);
+                Preconditions.checkArgument(StringUtils.isNotEmpty(meta), 
"meta is required when task.id is provided");
+                Map<String, Object> metaMap = objectMapper.readValue(meta, new 
TypeReference<Map<String, Object>>() {});
+                recordRequest.setMeta(metaMap);
+            }

Review Comment:
   `generateParams()` mutates the incoming `properties` map via 
`remove(TASK_ID_KEY)` / `remove(META_KEY)` after storing it into 
`recordRequest.setConfig(properties)`. This has two concrete risks: (1) it has 
side effects on the caller-owned TVF properties map, and (2) if the map is 
unmodifiable it will throw `UnsupportedOperationException` during planning. Use 
a defensive copy for payload generation (and set `recordRequest.config` to the 
intended final map) instead of mutating the input map in-place.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java:
##########
@@ -848,6 +849,17 @@ private MySqlSourceConfig generateMySqlConfig(Map<String, 
String> cdcConfig, Str
                     
Integer.parseInt(cdcConfig.get(DataSourceConfigKeys.SNAPSHOT_SPLIT_SIZE)));
         }
 
+        // todo: Currently, only one split key is supported; future will 
require multiple split
+        // keys.
+        if (cdcConfig.containsKey(DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY)) {
+            ObjectPath objectPath =
+                    new ObjectPath(
+                            cdcConfig.get(DataSourceConfigKeys.DATABASE),
+                            cdcConfig.get(DataSourceConfigKeys.TABLE));

Review Comment:
   When `snapshot_split_key` is provided, this code assumes 
`DataSourceConfigKeys.TABLE` is also set and builds an `ObjectPath(database, 
table)`. But for MySQL configs that use `include_tables` (without a single 
`table` property), `cdcConfig.get(TABLE)` can be null, leading to invalid 
ObjectPath/NPE. Please either validate that `table` is non-empty when 
`snapshot_split_key` is set, or extend the config format to support per-table 
split keys with `include_tables`.
   ```suggestion
               String database = cdcConfig.get(DataSourceConfigKeys.DATABASE);
               String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
               Preconditions.checkArgument(
                       database != null && !database.isEmpty() && table != null 
&& !table.isEmpty(),
                       "When '%s' is set, both '%s' (database) and '%s' (table) 
must be configured.",
                       DataSourceConfigKeys.SNAPSHOT_SPLIT_KEY,
                       DataSourceConfigKeys.DATABASE,
                       DataSourceConfigKeys.TABLE);
               ObjectPath objectPath = new ObjectPath(database, table);
   ```



##########
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres.groovy:
##########
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+/**
+ * Test streaming INSERT job using cdc_stream TVF for PostgreSQL.
+ *
+ * Scenario:
+ *   1. Snapshot phase (offset=initial): pre-existing rows (A1, B1) are synced.
+ *   2. Binlog phase: INSERT (C1, D1), UPDATE (C1), DELETE (D1) are applied.

Review Comment:
   The scenario comment claims UPDATE/DELETE events are applied in the binlog 
phase, but the test only performs INSERTs for C1/D1 and never issues the 
mentioned UPDATE/DELETE statements. Either add the missing UPDATE/DELETE 
operations and assertions, or adjust the comment so it matches the actual test 
behavior.
   ```suggestion
    *   2. Binlog phase: INSERT (C1, D1) events are applied.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to