Copilot commented on code in PR #60116:
URL: https://github.com/apache/doris/pull/60116#discussion_r2711847911


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java:
##########
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/**
+ * CdcStream TVF.
+ */
+public class CdcStream extends TableValuedFunction {
+
+    public CdcStream(Properties tvfProperties) {
+        super("cdc_stream", tvfProperties);
+    }
+
+    @Override
+    public FunctionSignature customSignature() {
+        return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, 
getArgumentsTypes());
+    }
+
+    @Override
+    protected TableValuedFunctionIf toCatalogFunction() {
+        try {
+            Map<String, String> arguments = getTVFProperties().getMap();
+            return new CdcStreamTableValuedFunction(arguments);
+        } catch (Throwable t) {
+            throw new AnalysisException("Can not build 
CdcStreamTableValuedFunction by "
+                    + this + ": " + t.getMessage(), t);
+        }
+    }
+
+    @Override
+    public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
+        return visitor.visitPostgresCdc(this, context);

Review Comment:
   The method 'visitPostgresCdc' is misleadingly named as it handles both MySQL 
and PostgreSQL CDC streams, not just PostgreSQL. This naming inconsistency can 
confuse developers. The method should be renamed to 'visitCdcStream' to match 
the actual functionality.
   ```suggestion
           return visitor.visitCdcStream(this, context);
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -81,19 +95,154 @@ public PipelineCoordinator() {
                         new ThreadPoolExecutor.AbortPolicy());
     }
 
+    /** return data for http_file_reader */
+    public StreamingResponseBody fetchRecordStream(FetchRecordRequest 
fetchReq) throws Exception {
+        SourceReader sourceReader;
+        SplitReadResult readResult;
+        try {
+            if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) {
+                LOG.info(
+                        "Generate initial meta for fetch record request, 
jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId());
+                // means the request did not originate from the job, only tvf
+                Map<String, Object> meta = generateMeta(fetchReq.getConfig());
+                fetchReq.setMeta(meta);
+            }
+
+            sourceReader = Env.getCurrentEnv().getReader(fetchReq);
+            readResult = sourceReader.readSplitRecords(fetchReq);
+        } catch (Exception ex) {
+            throw new CommonException(ex);
+        }
+
+        return outputStream -> {
+            try {
+                buildRecords(sourceReader, fetchReq, readResult, outputStream);
+            } catch (Exception ex) {
+                LOG.error(
+                        "Failed fetch record, jobId={}, taskId={}",
+                        fetchReq.getJobId(),
+                        fetchReq.getTaskId(),
+                        ex);
+                throw new StreamException(ex);
+            }
+        };
+    }
+
+    private void buildRecords(
+            SourceReader sourceReader,
+            FetchRecordRequest fetchRecord,
+            SplitReadResult readResult,
+            OutputStream rawOutputStream)
+            throws Exception {
+        SourceSplit split = readResult.getSplit();
+        Map<String, String> lastMeta = null;
+        int rowCount = 0;
+        BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream);
+        try {
+            // Serialize records and add them to the response (collect from 
iterator)
+            Iterator<SourceRecord> iterator = readResult.getRecordIterator();
+            while (iterator != null && iterator.hasNext()) {
+                SourceRecord element = iterator.next();
+                List<String> serializedRecords =
+                        sourceReader.deserialize(fetchRecord.getConfig(), 
element);
+                for (String record : serializedRecords) {
+                    bos.write(record.getBytes(StandardCharsets.UTF_8));
+                    bos.write(LINE_DELIMITER);
+                }
+                rowCount += serializedRecords.size();
+            }
+            // force flush buffer
+            bos.flush();
+        } finally {
+            // The LSN in the commit is the current offset, which is the 
offset from the last
+            // successful write.
+            // Therefore, even if a subsequent write fails, it will not affect 
the commit.
+            sourceReader.commitSourceOffset(fetchRecord.getJobId(), 
readResult.getSplit());
+
+            // This must be called after commitSourceOffset; otherwise,
+            // PG's confirmed lsn will not proceed.
+            sourceReader.finishSplitRecords();
+        }
+
+        LOG.info(
+                "Fetch records completed, jobId={}, taskId={}, splitId={}, 
rowCount={}",
+                fetchRecord.getJobId(),
+                fetchRecord.getTaskId(),
+                split.splitId(),
+                rowCount);
+
+        if (readResult.getSplitState() != null) {
+            // Set meta information for hw
+            if (sourceReader.isSnapshotSplit(split)) {
+                lastMeta = 
sourceReader.extractSnapshotStateOffset(readResult.getSplitState());
+                lastMeta.put(SPLIT_ID, split.splitId());
+            }
+
+            // set meta for binlog event
+            if (sourceReader.isBinlogSplit(split)) {
+                lastMeta = 
sourceReader.extractBinlogStateOffset(readResult.getSplitState());
+                lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
+            }
+        } else {
+            throw new RuntimeException("split state is null");
+        }
+
+        if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) {
+            taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta);
+        }
+
+        sourceReader.commitSourceOffset(fetchRecord.getJobId(), 
readResult.getSplit());

Review Comment:
   The 'commitSourceOffset' method is called twice - once at line 162 within 
the try-finally block, and again at line 196 after the finally block. This 
appears to be a bug as committing the same offset twice could lead to incorrect 
offset tracking or unexpected behavior. Remove one of the duplicate calls.
   ```suggestion
   
   ```



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -96,6 +98,21 @@ public static ZoneId 
getPostgresServerTimeZoneFromProps(java.util.Properties pro
         return ZoneId.systemDefault();
     }
 
+    public static String[] getTableList(String schema, Map<String, String> 
cdcConfig) {
+        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
+        String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
+        if (StringUtils.isNotEmpty(includingTables)) {
+            return Arrays.stream(includingTables.split(","))
+                    .map(t -> schema + "." + t.trim())
+                    .toArray(String[]::new);
+        } else if (StringUtils.isNotEmpty(table)) {
+            Preconditions.checkArgument(!table.contains(","), "table only 
support one table");

Review Comment:
   The error message "table only support one table" contains a grammatical 
error. It should be "table only supports one table" (with an 's' in 'supports').
   ```suggestion
               Preconditions.checkArgument(!table.contains(","), "table only 
supports one table");
   ```



##########
be/src/io/fs/http_file_reader.cpp:
##########
@@ -269,6 +335,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
     long http_status = _client->get_http_status();
     VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" 
<< buf.size();
 
+    // Check for HTTP error status codes (4xx, 5xx)
+    if (http_status >= 400) {
+        std::string error_body;
+        if (buf.empty()) {
+            error_body = "(empty response body)";
+        } else {
+            // Limit error message to 1024 bytes to avoid excessive logging
+            size_t max_len = std::min(buf.size(), static_cast<size_t>(1024));
+            error_body = buf.substr(0, max_len);
+        }
+
+        return Status::InternalError("HTTP request failed with status {}: 
{}.", http_status,
+                                     error_body);

Review Comment:
   Missing space after comma in the error message format string. The format 
should be "HTTP request failed with status {}: {}." with proper spacing.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java:
##########
@@ -81,6 +82,10 @@ default R visitHttp(Http http, C context) {
         return visitTableValuedFunction(http, context);
     }
 
+    default R visitPostgresCdc(CdcStream cdcStream, C context) {
+        return visitTableValuedFunction(cdcStream, context);
+    }
+

Review Comment:
   The method name 'visitPostgresCdc' is misleading as this TVF supports both 
MySQL and PostgreSQL CDC streaming, not just PostgreSQL. Consider renaming to 
'visitCdcStream' to accurately reflect its purpose.
   ```suggestion
       default R visitCdcStream(CdcStream cdcStream, C context) {
           return visitTableValuedFunction(cdcStream, context);
       }
   
       /**
        * @deprecated This method name is misleading because {@link CdcStream} 
supports multiple CDC sources.
        *             Use {@link #visitCdcStream(CdcStream, Object)} instead.
        */
       @Deprecated
       default R visitPostgresCdc(CdcStream cdcStream, C context) {
           return visitCdcStream(cdcStream, context);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to