Copilot commented on code in PR #9380:
URL: https://github.com/apache/seatunnel/pull/9380#discussion_r2127782176
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java:
##########
@@ -27,36 +27,224 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sourcetype.DatabaseTypeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
+import org.apache.commons.lang3.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.SneakyThrows;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class JdbcSource
implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit,
JdbcSourceState>,
SupportParallelism,
SupportColumnProjection {
protected static final Logger LOG =
LoggerFactory.getLogger(JdbcSource.class);
-
+ private static final String POINT = ".";
private final JdbcSourceConfig jdbcSourceConfig;
private final Map<TablePath, JdbcSourceTable> jdbcSourceTables;
@SneakyThrows
public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
this.jdbcSourceConfig = jdbcSourceConfig;
+ JdbcConnectionConfig jdbcConnectionConfig =
jdbcSourceConfig.getJdbcConnectionConfig();
+ JdbcDialect jdbcDialect =
+ JdbcDialectLoader.load(
+ jdbcConnectionConfig.getUrl(),
jdbcConnectionConfig.getCompatibleMode());
+ if (!isSupportedDatabase(jdbcDialect.dialectName())) {
+ this.jdbcSourceTables =
+ JdbcCatalogUtils.getTables(
+ jdbcSourceConfig.getJdbcConnectionConfig(),
+ jdbcSourceConfig.getTableConfigList());
+ return;
+ }
+ JdbcConnectionProvider connectionProvider =
+
jdbcDialect.getJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
+ Connection connection = connectionProvider.getOrEstablishConnection();
+ List<JdbcSourceTableConfig> jdbcSourceTableConfigs = new ArrayList<>();
+ ResultSet rs = null;
+ PreparedStatement ps = null;
+ List<JdbcSourceTableConfig> tablePaths =
jdbcSourceConfig.getTableConfigList();
+ try {
+ for (JdbcSourceTableConfig tableConfig : tablePaths) {
+ List<String> schemaTables = new ArrayList<>();
+ String tablePath = tableConfig.getTablePath();
+ String query = tableConfig.getQuery();
+ LOG.info("Processing table path: {}, custom query: {}",
tablePath, query);
+ String sql;
+ if (StringUtils.isBlank(query)) {
+ String schemaName;
+ if
(jdbcDialect.dialectName().startsWith(DatabaseTypeEnum.ORACLE.getValue())) {
+ schemaName = tablePath.split("\\.")[0];
+ sql = "SELECT OWNER, TABLE_NAME FROM dba_tables where
OWNER=?";
+ ps = connection.prepareStatement(sql);
+ ps.setString(1, schemaName);
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ // For Oracle: schema.table
+ String foundTable =
+ rs.getString("OWNER") + POINT +
rs.getString("TABLE_NAME");
+ schemaTables.add(foundTable);
+ LOG.info("Found table in Oracle: {}", foundTable);
+ }
+ } else if (jdbcDialect
+ .dialectName()
+ .equals(DatabaseTypeEnum.MYSQL.getValue())) {
+ schemaName = tablePath.split("\\.")[0];
+ sql =
+ "SELECT TABLE_SCHEMA, TABLE_NAME FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA =?";
+ ps = connection.prepareStatement(sql);
+ ps.setString(1, schemaName);
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ // For MySQL: database.table
+ String foundTable =
+ rs.getString("TABLE_SCHEMA")
+ + POINT
+ + rs.getString("TABLE_NAME");
+ schemaTables.add(foundTable);
+ LOG.info("Found table in MySQL: {}", foundTable);
+ }
+ } else if (jdbcDialect
+ .dialectName()
+ .equals(DatabaseTypeEnum.SQLSERVER.getValue())) {
+ String[] pathParts = tablePath.split("\\.");
+ String databaseName = pathParts[0];
+ schemaName = pathParts[1];
+ sql =
+ "SELECT TABLE_SCHEMA, TABLE_NAME FROM
INFORMATION_SCHEMA.TABLES"
+ + " WHERE TABLE_SCHEMA =? AND
TABLE_CATALOG = DB_NAME()"
+ + " AND TABLE_TYPE ='BASE TABLE'";
+ ps = connection.prepareStatement(sql);
+ ps.setString(1, schemaName);
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ // For SQLServer: database.schema.table
+ String foundTable =
+ databaseName
+ + POINT
+ + rs.getString("TABLE_SCHEMA")
+ + POINT
+ + rs.getString("TABLE_NAME");
+ schemaTables.add(foundTable);
+ LOG.info("Found table in SQLServer: {}",
foundTable);
+ }
+ } else if (jdbcDialect
+ .dialectName()
+ .equals(DatabaseTypeEnum.POSTGRESQL.getValue())) {
+ String[] pathParts = tablePath.split("\\.");
+ String databaseName = pathParts[0];
+ schemaName = pathParts[1];
+ LOG.info(
+ "PostgreSQL: Processing database: {}, schema:
{}",
+ databaseName,
+ schemaName);
+ sql =
+ "SELECT table_schema, table_name FROM
information_schema.tables "
+ + "WHERE table_schema = ? AND
table_type = 'BASE TABLE' "
+ + "AND table_catalog =
current_database()";
+ ps = connection.prepareStatement(sql);
+ ps.setString(1, schemaName);
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ // For PostgreSQL: database.schema.table
+ String foundTable =
+ databaseName
+ + POINT
+ + rs.getString("table_schema")
+ + POINT
+ + rs.getString("table_name");
+ schemaTables.add(foundTable);
+ LOG.info("Found table in PostgreSQL: {}",
foundTable);
+ }
+ } else {
+ throw new RuntimeException(
+ "not support dialect " +
jdbcDialect.dialectName());
+ }
+ filterCapturedTablesByRegrex(
+ jdbcSourceTableConfigs, tableConfig, schemaTables,
tablePath);
+ } else {
+ jdbcSourceTableConfigs.add(tableConfig);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Regular expression match failed:", e);
+ } finally {
+ if (rs != null) {
+ rs.close();
+ }
+ if (ps != null) {
Review Comment:
The loop reassigns 'ps' and 'rs' without closing previous resources until
finally. Consider closing each PreparedStatement and ResultSet inside the loop
after use to prevent resource leaks.
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java:
##########
@@ -41,6 +41,10 @@ public static JdbcDialect load(String url, String dialect,
String compatibleMode
return load(url, compatibleMode, dialect, "", null);
}
+ public static JdbcDialect load(String url, String compatibleMode) {
+ return load(url, compatibleMode, "");
Review Comment:
The overload calls load(url, compatibleMode, "") but the parameters appear
swapped (dialect vs. compatibleMode). This may cause incorrect dialect
detection—swap arguments or adjust the signature to match the intended
parameter order.
```suggestion
return load(url, "", compatibleMode);
```
##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java:
##########
@@ -211,6 +211,12 @@ public Long approximateRowCntStatement(Connection
connection, JdbcSourceTable ta
String query = table.getQuery();
+ // Add null value judgment
+ if (table.getTablePath() == null) {
Review Comment:
When table.getTablePath() is null but query is also null, passing a null
query to countForSubquery can still cause a NullPointerException. Add a check
to validate 'query' is not null or blank before invoking.
```suggestion
if (table.getTablePath() == null) {
if (StringUtils.isBlank(query)) {
throw new IllegalArgumentException("Query cannot be null or
blank when tablePath is null.");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]