Copilot commented on code in PR #9331: URL: https://github.com/apache/seatunnel/pull/9331#discussion_r2094790607
########## seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/source/DatabendSourceReader.java: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.databend.source; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; +import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorException; +import org.apache.seatunnel.connectors.seatunnel.databend.util.DatabendUtil; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.DriverManager; + +@Slf4j +public class DatabendSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> { + + private final SingleSplitReaderContext context; + private final String sql; + private final String jdbcUrl; + private final String username; + private final String password; + private final Integer fetchSize; + private final SeaTunnelRowType rowTypeInfo; + + private Connection connection; + private Statement statement; + private ResultSet resultSet; + private volatile boolean hasNext; + + public DatabendSourceReader( + SingleSplitReaderContext context, + String sql, + String jdbcUrl, + String username, + String password, + Integer fetchSize, + SeaTunnelRowType rowTypeInfo) { + this.context = context; + this.sql = sql; + this.jdbcUrl = jdbcUrl; + this.username = username; + this.password = password; + this.fetchSize = fetchSize; + this.rowTypeInfo = rowTypeInfo; + } + + @Override + public void open() throws IOException { + try { + connection = DriverManager.getConnection(jdbcUrl, username, password); Review Comment: Bypassing `DatabendUtil.createConnection` ignores optional `JDBC_CONFIG` settings. Use the utility method to honor additional JDBC properties. ```suggestion connection = DatabendUtil.createConnection(jdbcUrl, username, password); ``` ########## seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/exception/DatabendConnectorErrorCode.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.databend.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum DatabendConnectorErrorCode implements SeaTunnelErrorCode { + CONNECT_FAILED("DATABEND-01", "Failed to connect to Databend"), + SQL_OPERATION_FAILED("DATABEND-02", "Failed to execute SQL in Databend"), + PARSE_RESPONSE_FAILED("DATABEND-03", "Failed to parse data from Databend"), + GENERATE_SQL_FAILED("DATABEND-04", "Failed to generate SQL for Databend"), + DRIVER_NOT_FOUND("DATABEND-05","Failed to get driver"), + UNSUPPORTED_DATA_TYPE("DATABEND-06","unsupported data tye"), + ILLEGAL_STATE("DATABEND-07","illegeal state"); Review Comment: Typo in the description: "illegeal" should be "illegal". ```suggestion ILLEGAL_STATE("DATABEND-07","illegal state"); ``` ########## seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkWriter.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.databend.sink; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorException; + +import lombok.extern.slf4j.Slf4j; +import org.apache.seatunnel.connectors.seatunnel.databend.util.DatabendUtil; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +@Slf4j +public class DatabendSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void> { + + private final Connection connection; + private final CatalogTable catalogTable; + private final String insertSql; + private final int batchSize; + private final int executeTimeoutSec; + + private PreparedStatement preparedStatement; + private int batchCount = 0; + + public DatabendSinkWriter( + Connection connection, + CatalogTable catalogTable, + String customSql, + String database, + String table, + int batchSize, + int executeTimeoutSec) { + this.connection = connection; + this.catalogTable = catalogTable; + this.batchSize = batchSize; + this.executeTimeoutSec = executeTimeoutSec; + + if (customSql != null && !customSql.isEmpty()) { + this.insertSql = customSql; + } else { + this.insertSql = generateInsertSql(database, table, catalogTable.getSeaTunnelRowType()); + } + + try { + this.preparedStatement = connection.prepareStatement(insertSql); + this.preparedStatement.setQueryTimeout(executeTimeoutSec); + } catch (SQLException e) { + throw new DatabendConnectorException( + DatabendConnectorErrorCode.SQL_OPERATION_FAILED, + "Failed to prepare statement: " + e.getMessage(), + e); + } + } + + @Override + public void write(SeaTunnelRow row) { + try { + processRow(row); + batchCount++; + if (batchCount >= batchSize) { + executeBatch(); + } + } catch (Exception e) { + throw new DatabendConnectorException( + DatabendConnectorErrorCode.SQL_OPERATION_FAILED, + "Failed to write data to Databend: " + e.getMessage(), + e); + } + } + + private void processRow(SeaTunnelRow row) throws SQLException { + for (int i = 0; i < row.getFields().length; i++) { + Object field = row.getFields()[i]; + preparedStatement.setObject(i + 1, field); + } + preparedStatement.addBatch(); + } + + @Override + public Optional<Void> prepareCommit() throws IOException { + executeBatch(); + return Optional.empty(); + } + + @Override + public void abortPrepare() { + try { + if (connection != null && !connection.getAutoCommit()) { + log.info("Aborting prepared transaction"); + connection.rollback(); + } + batchCount = 0; + } catch (SQLException e) { + throw new DatabendConnectorException( + DatabendConnectorErrorCode.SQL_OPERATION_FAILED, + "Failed to abort transaction: " + e.getMessage(), + e); + } + } + + private void executeBatch() { + if (batchCount > 0) { + try { + log.info("Executing batch of {} records", batchCount); + preparedStatement.executeBatch(); + batchCount = 0; + } catch (SQLException e) { + throw new DatabendConnectorException( + DatabendConnectorErrorCode.SQL_OPERATION_FAILED, + "Failed to execute batch: " + e.getMessage(), + e); + } + } + } + + private String generateInsertSql(String database, String table, SeaTunnelRowType rowType) { + return DatabendUtil.generateInsertSql(database, table, catalogTable); + } + + @Override + public void close() throws IOException { + try { + if (preparedStatement != null) { + preparedStatement.close(); + } + if (connection != null && !connection.getAutoCommit()) { + connection.commit(); Review Comment: The close() method only closes the connection when auto-commit is disabled, leading to a connection leak when auto-commit is true. Always close the connection regardless of its auto-commit setting. ```suggestion if (connection != null) { if (!connection.getAutoCommit()) { connection.commit(); } ``` ########## seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/exception/DatabendConnectorErrorCode.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.databend.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum DatabendConnectorErrorCode implements SeaTunnelErrorCode { + CONNECT_FAILED("DATABEND-01", "Failed to connect to Databend"), + SQL_OPERATION_FAILED("DATABEND-02", "Failed to execute SQL in Databend"), + PARSE_RESPONSE_FAILED("DATABEND-03", "Failed to parse data from Databend"), + GENERATE_SQL_FAILED("DATABEND-04", "Failed to generate SQL for Databend"), + DRIVER_NOT_FOUND("DATABEND-05","Failed to get driver"), + UNSUPPORTED_DATA_TYPE("DATABEND-06","unsupported data tye"), Review Comment: Typo in the description: "tye" should be "type". ```suggestion UNSUPPORTED_DATA_TYPE("DATABEND-06","unsupported data type"), ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
