github-actions[bot] commented on code in PR #60116:
URL: https://github.com/apache/doris/pull/60116#discussion_r2980531230


##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final String URI = 
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";;
+
+    public CdcStreamTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        validate(properties);
+        processProps(properties);
+    }
+
+    private void processProps(Map<String, String> properties) throws 
AnalysisException {
+        Map<String, String> copyProps = new HashMap<>(properties);
+        copyProps.put("format", "json");
+        super.parseCommonProperties(copyProps);
+        this.processedParams.put("enable_cdc_client", "true");
+        this.processedParams.put("uri", URI);
+        this.processedParams.put("http.enable.range.request", "false");
+        this.processedParams.put("http.enable.chunk.response", "true");
+        this.processedParams.put("http.method", "POST");
+
+        String payload = generateParams(properties);
+        this.processedParams.put("http.payload", payload);
+        this.backendConnectProperties.putAll(processedParams);
+        generateFileStatus();
+    }
+
+    private String generateParams(Map<String, String> properties) throws 
AnalysisException {
+        FetchRecordRequest recordRequest = new FetchRecordRequest();
+        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+        recordRequest.setConfig(properties);
+        try {
+            return objectMapper.writeValueAsString(recordRequest);
+        } catch (IOException e) {
+            LOG.info("Failed to serialize fetch record request," + 
e.getMessage());
+            throw new AnalysisException(e.getMessage());
+        }
+    }
+
+    private void validate(Map<String, String> properties) throws 
AnalysisException {
+        if (!properties.containsKey(DataSourceConfigKeys.JDBC_URL)) {

Review Comment:
   **[Low]** `validate()` only checks for `jdbc_url`, `type`, `table`, and 
`offset`, but `getTableColumns()` and the CDC client also require `user`, 
`password`, `driver_url`, and `driver_class`. If these are missing, the user 
will get a cryptic JDBC connection error instead of a clear validation message.
   
   Consider adding checks for these required properties, or at minimum `user` 
and `password`.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java:
##########
@@ -257,7 +251,7 @@ private PostgresSourceConfig generatePostgresConfig(
         return configFactory.create(subtaskId);
     }
 
-    private String getSlotName(Long jobId) {
+    private String getSlotName(String jobId) {
         return "doris_cdc_" + jobId;

Review Comment:
   **[Low]** `getSlotName()` now concatenates `"doris_cdc_" + jobId` where 
`jobId` can be a 32-character UUID (from TVF). This produces a 42-character 
slot name. PostgreSQL replication slot names have a max length of 63 characters 
(per `NAMEDATALEN`), so this is within bounds, but barely. Worth documenting 
the constraint here.
   
   Also note that UUID-based slot names are cleaned up at the end of 
`buildStreamRecords()` via `sourceReader.close(fetchRecord)`, which should drop 
the slot. If the close fails or the process crashes, orphan slots will remain 
in PostgreSQL.



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final String URI = 
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";;
+
+    public CdcStreamTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        validate(properties);
+        processProps(properties);
+    }
+
+    private void processProps(Map<String, String> properties) throws 
AnalysisException {
+        Map<String, String> copyProps = new HashMap<>(properties);
+        copyProps.put("format", "json");
+        super.parseCommonProperties(copyProps);
+        this.processedParams.put("enable_cdc_client", "true");
+        this.processedParams.put("uri", URI);
+        this.processedParams.put("http.enable.range.request", "false");
+        this.processedParams.put("http.enable.chunk.response", "true");
+        this.processedParams.put("http.method", "POST");
+
+        String payload = generateParams(properties);
+        this.processedParams.put("http.payload", payload);
+        this.backendConnectProperties.putAll(processedParams);
+        generateFileStatus();
+    }
+
+    private String generateParams(Map<String, String> properties) throws 
AnalysisException {
+        FetchRecordRequest recordRequest = new FetchRecordRequest();
+        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+        recordRequest.setConfig(properties);
+        try {
+            return objectMapper.writeValueAsString(recordRequest);
+        } catch (IOException e) {
+            LOG.info("Failed to serialize fetch record request," + 
e.getMessage());
+            throw new AnalysisException(e.getMessage());

Review Comment:
   **[Medium]** `LOG.info` should be `LOG.warn` or `LOG.error` for a failure 
path. Using `info` level for an error that triggers an `AnalysisException` is 
misleading — it will be filtered out in production log analysis:
   ```java
   LOG.info("Failed to serialize fetch record request," + e.getMessage());
   ```
   Should be:
   ```java
   LOG.warn("Failed to serialize fetch record request: {}", e.getMessage(), e);
   ```
   Also note the missing space after the comma in the message, and the use of 
string concatenation instead of parameterized logging.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -51,26 +58,31 @@
 import static 
org.apache.flink.cdc.connectors.base.utils.SourceRecordUtils.SCHEMA_HEARTBEAT_EVENT_KEY_NAME;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import io.debezium.data.Envelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import 
org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
 
 /** Pipeline coordinator. */
 @Component
 public class PipelineCoordinator {
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelineCoordinator.class);
     private static final String SPLIT_ID = "splitId";
     // jobId
-    private final Map<Long, DorisBatchStreamLoad> batchStreamLoadMap = new 
ConcurrentHashMap<>();
+    private final Map<String, DorisBatchStreamLoad> batchStreamLoadMap = new 
ConcurrentHashMap<>();
+    // taskId, offset
+    private final Map<String, Map<String, String>> taskOffsetCache = new 
ConcurrentHashMap<>();
     // taskId -> writeFailReason

Review Comment:
   **[Low]** `taskOffsetCache` is a `ConcurrentHashMap` that is populated at 
the end of `buildStreamRecords` but only cleaned up by `getOffsetWithTaskId()` 
(via `remove()`). If the caller never retrieves the offset (e.g., due to an 
error in the query pipeline), entries will accumulate indefinitely. Consider 
adding a TTL-based eviction or a periodic cleanup mechanism to prevent 
unbounded memory growth in long-running CDC client processes.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java:
##########
@@ -511,7 +511,7 @@ public void commitOffset(
             CommitOffsetRequest commitRequest =
                     CommitOffsetRequest.builder()
                             .offset(OBJECT_MAPPER.writeValueAsString(meta))
-                            .jobId(jobId)
+                            .jobId(Long.parseLong(jobId))
                             .taskId(Long.parseLong(taskId))

Review Comment:
   **[Low]** `Long.parseLong(jobId)` will throw `NumberFormatException` if 
`jobId` is a UUID string (as generated by the TVF path in 
`CdcStreamTableValuedFunction.generateParams()`). While the TVF streaming path 
does not go through `DorisBatchStreamLoad.commitOffset()`, this conversion is 
fragile — if the code paths ever converge, this will fail silently. Consider 
adding a guard or documenting the assumption clearly.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -90,6 +102,159 @@ public PipelineCoordinator() {
                         new ThreadPoolExecutor.AbortPolicy());
     }
 
+    /** return data for http_file_reader */
+    public StreamingResponseBody fetchRecordStream(FetchRecordRequest 
fetchReq) throws Exception {
+        SourceReader sourceReader;
+        SplitReadResult readResult;
+        try {
+            if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) {
+                LOG.info(
+                        "Generate initial meta for fetch record request, 
jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId());
+                // means the request did not originate from the job, only tvf
+                Map<String, Object> meta = generateMeta(fetchReq.getConfig());
+                fetchReq.setMeta(meta);
+            }
+
+            sourceReader = Env.getCurrentEnv().getReader(fetchReq);
+            readResult = sourceReader.prepareAndSubmitSplit(fetchReq);
+        } catch (Exception ex) {
+            throw new CommonException(ex);
+        }
+
+        return outputStream -> {
+            try {
+                buildStreamRecords(sourceReader, fetchReq, readResult, 
outputStream);
+            } catch (Exception ex) {
+                LOG.error(
+                        "Failed fetch record, jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId(),
+                        ex);
+                throw new StreamException(ex);
+            }
+        };
+    }
+
+    private void buildStreamRecords(
+            SourceReader sourceReader,
+            FetchRecordRequest fetchRecord,
+            SplitReadResult readResult,
+            OutputStream rawOutputStream)
+            throws Exception {
+        SourceSplit split = readResult.getSplit();
+        Map<String, String> lastMeta = null;
+        int rowCount = 0;
+        BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+        try {
+            // Poll records using the existing mechanism
+            boolean shouldStop = false;
+            long startTime = System.currentTimeMillis();
+            while (!shouldStop) {
+                Iterator<SourceRecord> recordIterator = 
sourceReader.pollRecords();
+                if (!recordIterator.hasNext()) {
+                    Thread.sleep(100);
+                    long elapsedTime = System.currentTimeMillis() - startTime;

Review Comment:
   **[Nit]** The `Thread.sleep(100)` busy-wait loop with timeout is acceptable 
for the bounded CDC streaming use case, but consider using `Thread.sleep` with 
an exponential backoff (e.g., 100ms → 200ms → 400ms up to a cap) to reduce CPU 
overhead when the source is slow. The timeout 
(`Constants.POLL_SPLIT_RECORDS_TIMEOUTS`) already bounds the overall wait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to