Copilot commented on code in PR #60116: URL: https://github.com/apache/doris/pull/60116#discussion_r2711847911
########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java: ########## @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.table; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Properties; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.coercion.AnyDataType; +import org.apache.doris.tablefunction.CdcStreamTableValuedFunction; +import org.apache.doris.tablefunction.TableValuedFunctionIf; + +import java.util.Map; + +/** + * CdcStream TVF. + */ +public class CdcStream extends TableValuedFunction { + + public CdcStream(Properties tvfProperties) { + super("cdc_stream", tvfProperties); + } + + @Override + public FunctionSignature customSignature() { + return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes()); + } + + @Override + protected TableValuedFunctionIf toCatalogFunction() { + try { + Map<String, String> arguments = getTVFProperties().getMap(); + return new CdcStreamTableValuedFunction(arguments); + } catch (Throwable t) { + throw new AnalysisException("Can not build CdcStreamTableValuedFunction by " + + this + ": " + t.getMessage(), t); + } + } + + @Override + public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) { + return visitor.visitPostgresCdc(this, context); Review Comment: The method 'visitPostgresCdc' is misleadingly named as it handles both MySQL and PostgreSQL CDC streams, not just PostgreSQL. This naming inconsistency can confuse developers. The method should be renamed to 'visitCdcStream' to match the actual functionality. ```suggestion return visitor.visitCdcStream(this, context); ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java: ########## @@ -81,19 +95,154 @@ public PipelineCoordinator() { new ThreadPoolExecutor.AbortPolicy()); } + /** return data for http_file_reader */ + public StreamingResponseBody fetchRecordStream(FetchRecordRequest fetchReq) throws Exception { + SourceReader sourceReader; + SplitReadResult readResult; + try { + if (fetchReq.getTaskId() == null && fetchReq.getMeta() == null) { + LOG.info( + "Generate initial meta for fetch record request, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId()); + // means the request did not originate from the job, only tvf + Map<String, Object> meta = generateMeta(fetchReq.getConfig()); + fetchReq.setMeta(meta); + } + + sourceReader = Env.getCurrentEnv().getReader(fetchReq); + readResult = sourceReader.readSplitRecords(fetchReq); + } catch (Exception ex) { + throw new CommonException(ex); + } + + return outputStream -> { + try { + buildRecords(sourceReader, fetchReq, readResult, outputStream); + } catch (Exception ex) { + LOG.error( + "Failed fetch record, jobId={}, taskId={}", + fetchReq.getJobId(), + fetchReq.getTaskId(), + ex); + throw new StreamException(ex); + } + }; + } + + private void buildRecords( + SourceReader sourceReader, + FetchRecordRequest fetchRecord, + SplitReadResult readResult, + OutputStream rawOutputStream) + throws Exception { + SourceSplit split = readResult.getSplit(); + Map<String, String> lastMeta = null; + int rowCount = 0; + BufferedOutputStream bos = new BufferedOutputStream(rawOutputStream); + try { + // Serialize records and add them to the response (collect from iterator) + Iterator<SourceRecord> iterator = readResult.getRecordIterator(); + while (iterator != null && iterator.hasNext()) { + SourceRecord element = iterator.next(); + List<String> serializedRecords = + sourceReader.deserialize(fetchRecord.getConfig(), element); + for (String record : serializedRecords) { + bos.write(record.getBytes(StandardCharsets.UTF_8)); + bos.write(LINE_DELIMITER); + } + rowCount += serializedRecords.size(); + } + // force flush buffer + bos.flush(); + } finally { + // The LSN in the commit is the current offset, which is the offset from the last + // successful write. + // Therefore, even if a subsequent write fails, it will not affect the commit. + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); + + // This must be called after commitSourceOffset; otherwise, + // PG's confirmed lsn will not proceed. + sourceReader.finishSplitRecords(); + } + + LOG.info( + "Fetch records completed, jobId={}, taskId={}, splitId={}, rowCount={}", + fetchRecord.getJobId(), + fetchRecord.getTaskId(), + split.splitId(), + rowCount); + + if (readResult.getSplitState() != null) { + // Set meta information for hw + if (sourceReader.isSnapshotSplit(split)) { + lastMeta = sourceReader.extractSnapshotStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, split.splitId()); + } + + // set meta for binlog event + if (sourceReader.isBinlogSplit(split)) { + lastMeta = sourceReader.extractBinlogStateOffset(readResult.getSplitState()); + lastMeta.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID); + } + } else { + throw new RuntimeException("split state is null"); + } + + if (StringUtils.isNotEmpty(fetchRecord.getTaskId())) { + taskOffsetCache.put(fetchRecord.getTaskId(), lastMeta); + } + + sourceReader.commitSourceOffset(fetchRecord.getJobId(), readResult.getSplit()); Review Comment: The 'commitSourceOffset' method is called twice - once at line 162 within the try-finally block, and again at line 196 after the finally block. This appears to be a bug as committing the same offset twice could lead to incorrect offset tracking or unexpected behavior. Remove one of the duplicate calls. ```suggestion ``` ########## fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java: ########## @@ -96,6 +98,21 @@ public static ZoneId getPostgresServerTimeZoneFromProps(java.util.Properties pro return ZoneId.systemDefault(); } + public static String[] getTableList(String schema, Map<String, String> cdcConfig) { + String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); + String table = cdcConfig.get(DataSourceConfigKeys.TABLE); + if (StringUtils.isNotEmpty(includingTables)) { + return Arrays.stream(includingTables.split(",")) + .map(t -> schema + "." + t.trim()) + .toArray(String[]::new); + } else if (StringUtils.isNotEmpty(table)) { + Preconditions.checkArgument(!table.contains(","), "table only support one table"); Review Comment: The error message "table only support one table" contains a grammatical error. It should be "table only supports one table" (with an 's' in 'supports'). ```suggestion Preconditions.checkArgument(!table.contains(","), "table only supports one table"); ``` ########## be/src/io/fs/http_file_reader.cpp: ########## @@ -269,6 +335,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r long http_status = _client->get_http_status(); VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size(); + // Check for HTTP error status codes (4xx, 5xx) + if (http_status >= 400) { + std::string error_body; + if (buf.empty()) { + error_body = "(empty response body)"; + } else { + // Limit error message to 1024 bytes to avoid excessive logging + size_t max_len = std::min(buf.size(), static_cast<size_t>(1024)); + error_body = buf.substr(0, max_len); + } + + return Status::InternalError("HTTP request failed with status {}: {}.", http_status, + error_body); Review Comment: Missing space after comma in the error message format string. The format should be "HTTP request failed with status {}: {}." with proper spacing. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java: ########## @@ -81,6 +82,10 @@ default R visitHttp(Http http, C context) { return visitTableValuedFunction(http, context); } + default R visitPostgresCdc(CdcStream cdcStream, C context) { + return visitTableValuedFunction(cdcStream, context); + } + Review Comment: The method name 'visitPostgresCdc' is misleading as this TVF supports both MySQL and PostgreSQL CDC streaming, not just PostgreSQL. Consider renaming to 'visitCdcStream' to accurately reflect its purpose. ```suggestion default R visitCdcStream(CdcStream cdcStream, C context) { return visitTableValuedFunction(cdcStream, context); } /** * @deprecated This method name is misleading because {@link CdcStream} supports multiple CDC sources. * Use {@link #visitCdcStream(CdcStream, Object)} instead. */ @Deprecated default R visitPostgresCdc(CdcStream cdcStream, C context) { return visitCdcStream(cdcStream, context); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
