Copilot commented on code in PR #9380:
URL: https://github.com/apache/seatunnel/pull/9380#discussion_r2127782176


##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java:
##########
@@ -27,36 +27,224 @@
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sourcetype.DatabaseTypeEnum;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSourceState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcCatalogUtils;
 
+import org.apache.commons.lang3.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import lombok.SneakyThrows;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 public class JdbcSource
         implements SeaTunnelSource<SeaTunnelRow, JdbcSourceSplit, 
JdbcSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
     protected static final Logger LOG = 
LoggerFactory.getLogger(JdbcSource.class);
-
+    private static final String POINT = ".";
     private final JdbcSourceConfig jdbcSourceConfig;
     private final Map<TablePath, JdbcSourceTable> jdbcSourceTables;
 
     @SneakyThrows
     public JdbcSource(JdbcSourceConfig jdbcSourceConfig) {
         this.jdbcSourceConfig = jdbcSourceConfig;
+        JdbcConnectionConfig jdbcConnectionConfig = 
jdbcSourceConfig.getJdbcConnectionConfig();
+        JdbcDialect jdbcDialect =
+                JdbcDialectLoader.load(
+                        jdbcConnectionConfig.getUrl(), 
jdbcConnectionConfig.getCompatibleMode());
+        if (!isSupportedDatabase(jdbcDialect.dialectName())) {
+            this.jdbcSourceTables =
+                    JdbcCatalogUtils.getTables(
+                            jdbcSourceConfig.getJdbcConnectionConfig(),
+                            jdbcSourceConfig.getTableConfigList());
+            return;
+        }
+        JdbcConnectionProvider connectionProvider =
+                
jdbcDialect.getJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
+        Connection connection = connectionProvider.getOrEstablishConnection();
+        List<JdbcSourceTableConfig> jdbcSourceTableConfigs = new ArrayList<>();
+        ResultSet rs = null;
+        PreparedStatement ps = null;
+        List<JdbcSourceTableConfig> tablePaths = 
jdbcSourceConfig.getTableConfigList();
+        try {
+            for (JdbcSourceTableConfig tableConfig : tablePaths) {
+                List<String> schemaTables = new ArrayList<>();
+                String tablePath = tableConfig.getTablePath();
+                String query = tableConfig.getQuery();
+                LOG.info("Processing table path: {}, custom query: {}", 
tablePath, query);
+                String sql;
+                if (StringUtils.isBlank(query)) {
+                    String schemaName;
+                    if 
(jdbcDialect.dialectName().startsWith(DatabaseTypeEnum.ORACLE.getValue())) {
+                        schemaName = tablePath.split("\\.")[0];
+                        sql = "SELECT OWNER, TABLE_NAME FROM dba_tables where 
OWNER=?";
+                        ps = connection.prepareStatement(sql);
+                        ps.setString(1, schemaName);
+                        rs = ps.executeQuery();
+                        while (rs.next()) {
+                            // For Oracle: schema.table
+                            String foundTable =
+                                    rs.getString("OWNER") + POINT + 
rs.getString("TABLE_NAME");
+                            schemaTables.add(foundTable);
+                            LOG.info("Found table in Oracle: {}", foundTable);
+                        }
+                    } else if (jdbcDialect
+                            .dialectName()
+                            .equals(DatabaseTypeEnum.MYSQL.getValue())) {
+                        schemaName = tablePath.split("\\.")[0];
+                        sql =
+                                "SELECT TABLE_SCHEMA, TABLE_NAME FROM 
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA =?";
+                        ps = connection.prepareStatement(sql);
+                        ps.setString(1, schemaName);
+                        rs = ps.executeQuery();
+                        while (rs.next()) {
+                            // For MySQL: database.table
+                            String foundTable =
+                                    rs.getString("TABLE_SCHEMA")
+                                            + POINT
+                                            + rs.getString("TABLE_NAME");
+                            schemaTables.add(foundTable);
+                            LOG.info("Found table in MySQL: {}", foundTable);
+                        }
+                    } else if (jdbcDialect
+                            .dialectName()
+                            .equals(DatabaseTypeEnum.SQLSERVER.getValue())) {
+                        String[] pathParts = tablePath.split("\\.");
+                        String databaseName = pathParts[0];
+                        schemaName = pathParts[1];
+                        sql =
+                                "SELECT TABLE_SCHEMA, TABLE_NAME FROM 
INFORMATION_SCHEMA.TABLES"
+                                        + " WHERE TABLE_SCHEMA =? AND 
TABLE_CATALOG = DB_NAME()"
+                                        + " AND TABLE_TYPE ='BASE TABLE'";
+                        ps = connection.prepareStatement(sql);
+                        ps.setString(1, schemaName);
+                        rs = ps.executeQuery();
+                        while (rs.next()) {
+                            // For SQLServer: database.schema.table
+                            String foundTable =
+                                    databaseName
+                                            + POINT
+                                            + rs.getString("TABLE_SCHEMA")
+                                            + POINT
+                                            + rs.getString("TABLE_NAME");
+                            schemaTables.add(foundTable);
+                            LOG.info("Found table in SQLServer: {}", 
foundTable);
+                        }
+                    } else if (jdbcDialect
+                            .dialectName()
+                            .equals(DatabaseTypeEnum.POSTGRESQL.getValue())) {
+                        String[] pathParts = tablePath.split("\\.");
+                        String databaseName = pathParts[0];
+                        schemaName = pathParts[1];
+                        LOG.info(
+                                "PostgreSQL: Processing database: {}, schema: 
{}",
+                                databaseName,
+                                schemaName);
+                        sql =
+                                "SELECT table_schema, table_name FROM 
information_schema.tables "
+                                        + "WHERE table_schema = ? AND 
table_type = 'BASE TABLE' "
+                                        + "AND table_catalog = 
current_database()";
+                        ps = connection.prepareStatement(sql);
+                        ps.setString(1, schemaName);
+                        rs = ps.executeQuery();
+                        while (rs.next()) {
+                            // For PostgreSQL: database.schema.table
+                            String foundTable =
+                                    databaseName
+                                            + POINT
+                                            + rs.getString("table_schema")
+                                            + POINT
+                                            + rs.getString("table_name");
+                            schemaTables.add(foundTable);
+                            LOG.info("Found table in PostgreSQL: {}", 
foundTable);
+                        }
+                    } else {
+                        throw new RuntimeException(
+                                "not support dialect " + 
jdbcDialect.dialectName());
+                    }
+                    filterCapturedTablesByRegrex(
+                            jdbcSourceTableConfigs, tableConfig, schemaTables, 
tablePath);
+                } else {
+                    jdbcSourceTableConfigs.add(tableConfig);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Regular expression match failed:", e);
+        } finally {
+            if (rs != null) {
+                rs.close();
+            }
+            if (ps != null) {

Review Comment:
   The loop reassigns 'ps' and 'rs' without closing previous resources until 
finally. Consider closing each PreparedStatement and ResultSet inside the loop 
after use to prevent resource leaks.



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java:
##########
@@ -41,6 +41,10 @@ public static JdbcDialect load(String url, String dialect, 
String compatibleMode
         return load(url, compatibleMode, dialect, "", null);
     }
 
+    public static JdbcDialect load(String url, String compatibleMode) {
+        return load(url, compatibleMode, "");

Review Comment:
   The overload calls load(url, compatibleMode, "") but the parameters appear 
swapped (dialect vs. compatibleMode). This may cause incorrect dialect 
detection—swap arguments or adjust the signature to match the intended 
parameter order.
   ```suggestion
           return load(url, "", compatibleMode);
   ```



##########
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java:
##########
@@ -211,6 +211,12 @@ public Long approximateRowCntStatement(Connection 
connection, JdbcSourceTable ta
 
         String query = table.getQuery();
 
+        // Add null value judgment
+        if (table.getTablePath() == null) {

Review Comment:
   When table.getTablePath() is null but query is also null, passing a null 
query to countForSubquery can still cause a NullPointerException. Add a check 
to validate 'query' is not null or blank before invoking.
   ```suggestion
           if (table.getTablePath() == null) {
               if (StringUtils.isBlank(query)) {
                   throw new IllegalArgumentException("Query cannot be null or 
blank when tablePath is null.");
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to