Copilot commented on code in PR #60116:
URL: https://github.com/apache/doris/pull/60116#discussion_r2980526691


##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java:
##########
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.FetchRecordRequest;
+import org.apache.doris.job.common.DataSourceType;
+import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TFileType;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+public class CdcStreamTableValuedFunction extends 
ExternalFileTableValuedFunction {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final String URI = 
"http://127.0.0.1:CDC_CLIENT_PORT/api/fetchRecordStream";;
+
+    public CdcStreamTableValuedFunction(Map<String, String> properties) throws 
AnalysisException {
+        validate(properties);
+        processProps(properties);
+    }
+
+    private void processProps(Map<String, String> properties) throws 
AnalysisException {
+        Map<String, String> copyProps = new HashMap<>(properties);
+        copyProps.put("format", "json");
+        super.parseCommonProperties(copyProps);
+        this.processedParams.put("enable_cdc_client", "true");
+        this.processedParams.put("uri", URI);
+        this.processedParams.put("http.enable.range.request", "false");
+        this.processedParams.put("http.enable.chunk.response", "true");
+        this.processedParams.put("http.method", "POST");
+
+        String payload = generateParams(properties);
+        this.processedParams.put("http.payload", payload);
+        this.backendConnectProperties.putAll(processedParams);
+        generateFileStatus();
+    }
+
+    private String generateParams(Map<String, String> properties) throws 
AnalysisException {
+        FetchRecordRequest recordRequest = new FetchRecordRequest();
+        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
+        recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
+        recordRequest.setConfig(properties);
+        try {
+            return objectMapper.writeValueAsString(recordRequest);
+        } catch (IOException e) {
+            LOG.info("Failed to serialize fetch record request," + 
e.getMessage());
+            throw new AnalysisException(e.getMessage());

Review Comment:
   `generateParams` logs at INFO and drops the exception cause, then throws a 
new `AnalysisException` with only `e.getMessage()`. This makes diagnosing 
serialization failures harder (no stack trace / context). Consider logging with 
the throwable and rethrowing an `AnalysisException` that includes a clear 
message plus the original cause.
   ```suggestion
               LOG.warn("Failed to serialize fetch record request", e);
               throw new AnalysisException("Failed to serialize fetch record 
request: " + e.getMessage(), e);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java:
##########
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.expressions.functions.table;
+
+import org.apache.doris.catalog.FunctionSignature;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
+import org.apache.doris.nereids.types.coercion.AnyDataType;
+import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
+import org.apache.doris.tablefunction.TableValuedFunctionIf;
+
+import java.util.Map;
+
+/**
+ * CdcStream TVF.
+ */
+public class CdcStream extends TableValuedFunction {
+
+    public CdcStream(Properties tvfProperties) {
+        super("cdc_stream", tvfProperties);
+    }
+
+    @Override
+    public FunctionSignature customSignature() {
+        return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, 
getArgumentsTypes());
+    }
+
+    @Override
+    protected TableValuedFunctionIf toCatalogFunction() {
+        try {
+            Map<String, String> arguments = getTVFProperties().getMap();
+            return new CdcStreamTableValuedFunction(arguments);
+        } catch (Throwable t) {
+            throw new AnalysisException("Can not build 
CdcStreamTableValuedFunction by "

Review Comment:
   Spelling/grammar: prefer “Cannot” over “Can not” in user-facing exception 
text.
   ```suggestion
               throw new AnalysisException("Cannot build 
CdcStreamTableValuedFunction by "
   ```



##########
be/src/io/fs/http_file_reader.cpp:
##########
@@ -224,9 +276,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
     VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << 
req_len
             << " with_range=" << _range_supported;
 
-    // Prepare and initialize the HTTP client for GET request
+    // Prepare and initialize the HTTP client for request
     RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
-    _client->set_method(HttpMethod::GET);
+
+    // Determine HTTP method from configuration (default: GET)
+    HttpMethod method = HttpMethod::GET;
+    auto method_iter = _extend_kv.find("http.method");
+    if (method_iter != _extend_kv.end()) {
+        method = to_http_method(method_iter->second.c_str());
+        if (method == HttpMethod::UNKNOWN) {
+            LOG(WARNING) << "Invalid http.method value: " << 
method_iter->second
+                         << ", falling back to GET";
+            method = HttpMethod::GET;
+        }
+    }
+    _client->set_method(method);
+
+    // Set payload if configured (supports POST, PUT, DELETE, etc.)
+    auto payload_iter = _extend_kv.find("http.payload");
+    if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) {
+        _client->set_payload(payload_iter->second);
+        _client->set_content_type("application/json");
+        VLOG(2) << "HTTP request with payload, size=" << 
payload_iter->second.size();
+    }

Review Comment:
   `HttpFileReader` now honors user-supplied `http.method` and `http.payload`, 
which allows arbitrary non-GET requests (with bodies) from BE to any HTTP 
endpoint. This expands the attack surface beyond read-only SSRF (e.g. 
POST/DELETE side effects). Consider restricting these options to the CDC path 
only (e.g. require `enable_cdc_client=true` and/or only allow loopback URLs), 
and/or whitelist allowed methods (likely just POST) and reject others 
explicitly.



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -461,7 +461,7 @@ private static String getRemoteDbName(DataSourceType 
sourceType, Map<String, Str
                 Preconditions.checkArgument(StringUtils.isNotEmpty(remoteDb), 
"schema is required");
                 break;
             default:
-                throw new JobException("Unsupported source type " + 
sourceType);
+                throw new RuntimeException("Unsupported source type " + 
sourceType);
         }

Review Comment:
   `getRemoteDbName` used to throw `JobException` (checked) but now 
throws/returns `RuntimeException`, and it’s called from job code paths like 
`generateCreateTableCmds(...)` that declare `throws JobException`. This can 
bypass existing error handling/reporting for streaming jobs. Consider keeping 
`JobException` here (or wrapping unsupported source type / missing 
schema|database in `JobException`) and let the TVF convert that to an 
`AnalysisException` at its boundary.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java:
##########
@@ -107,6 +109,21 @@ public static Properties getDefaultDebeziumProps() {
         return properties;
     }
 
+    public static String[] getTableList(String schema, Map<String, String> 
cdcConfig) {
+        String includingTables = 
cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES);
+        String table = cdcConfig.get(DataSourceConfigKeys.TABLE);
+        if (StringUtils.isNotEmpty(includingTables)) {
+            return Arrays.stream(includingTables.split(","))
+                    .map(t -> schema + "." + t.trim())
+                    .toArray(String[]::new);
+        } else if (StringUtils.isNotEmpty(table)) {
+            Preconditions.checkArgument(!table.contains(","), "table only 
supports one table");
+            return new String[] {schema + "." + table.trim()};

Review Comment:
   `getTableList()` trims tokens but doesn’t filter out empty entries (e.g. 
`include_tables="t1, ,t2"` will produce `schema.""`). Consider filtering blank 
table names after `trim()` (and possibly validating against `.` to prevent 
already-qualified names) to fail fast with a clearer error.
   ```suggestion
               String[] tableNames = Arrays.stream(includingTables.split(","))
                       .map(String::trim)
                       .filter(t -> !t.isEmpty())
                       .toArray(String[]::new);
               Preconditions.checkArgument(tableNames.length > 0,
                       "include_tables must contain at least one non-blank 
table name");
               for (String t : tableNames) {
                   Preconditions.checkArgument(!t.contains("."),
                           "Table name should not be schema-qualified in 
include_tables: %s", t);
               }
               return Arrays.stream(tableNames)
                       .map(t -> schema + "." + t)
                       .toArray(String[]::new);
           } else if (StringUtils.isNotEmpty(table)) {
               Preconditions.checkArgument(!table.contains(","), "table only 
supports one table");
               String trimmedTable = table.trim();
               Preconditions.checkArgument(!trimmedTable.isEmpty(),
                       "table must not be blank");
               Preconditions.checkArgument(!trimmedTable.contains("."),
                       "Table name should not be schema-qualified in table: 
%s", trimmedTable);
               return new String[] {schema + "." + trimmedTable};
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to