This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 42e1761e [Bug][Seatunnel-web][DB2] DB2 Datasource fails due to 
improper use of database and schema names. (#220)
42e1761e is described below

commit 42e1761e594acc21bda98790fff00023e87aea9c
Author: Mohammad Arshad <[email protected]>
AuthorDate: Sun Sep 29 07:29:21 2024 +0530

    [Bug][Seatunnel-web][DB2] DB2 Datasource fails due to improper use of 
database and schema names. (#220)
---
 .../plugin/db2/jdbc/Db2DataSourceConfig.java       | 11 ++++-
 .../plugin/db2/jdbc/Db2JdbcDataSourceChannel.java  | 56 +++++++++++++++-------
 .../datasource/plugin/db2/jdbc/Db2OptionRule.java  |  4 +-
 .../seatunnel/app/bean/engine/EngineDataType.java  |  6 ++-
 .../impl/Db2DataSourceConfigSwitcher.java          |  6 ++-
 5 files changed, 59 insertions(+), 24 deletions(-)

diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java
index 90673587..fee9fb07 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2DataSourceConfig.java
@@ -38,7 +38,16 @@ public class Db2DataSourceConfig {
                     .build();
 
     public static final Set<String> DB2_SYSTEM_DATABASES =
-            Sets.newHashSet("information_schema", "mysql", 
"performance_schema", "sys");
+            Sets.newHashSet(
+                    "SYSTOOLS",
+                    "SYSCAT",
+                    "SYSIBM",
+                    "SYSIBMADM",
+                    "SYSSTAT",
+                    "SYSPROC",
+                    "SYSFUN",
+                    "SYSPUBLIC",
+                    "SYSIBMTS");
 
     public static final OptionRule OPTION_RULE =
             OptionRule.builder()
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
index 844a4756..724d428e 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2JdbcDataSourceChannel.java
@@ -25,14 +25,15 @@ import 
org.apache.seatunnel.datasource.plugin.api.model.TableField;
 import org.apache.commons.lang3.StringUtils;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -40,8 +41,8 @@ import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+@Slf4j
 public class Db2JdbcDataSourceChannel implements DataSourceChannel {
-
     @Override
     public OptionRule getDataSourceOptions(@NonNull String pluginName) {
         return Db2DataSourceConfig.OPTION_RULE;
@@ -65,13 +66,13 @@ public class Db2JdbcDataSourceChannel implements 
DataSourceChannel {
         if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
             filterName = "%" + filterName + "%";
         } else if (StringUtils.equals(filterName, "")) {
-            filterName = null;
+            filterName = "%";
         }
         try (Connection connection = getConnection(requestParams);
                 ResultSet resultSet =
                         connection
                                 .getMetaData()
-                                .getTables(null, null, "%", new String[] 
{"TABLE"})) {
+                                .getTables(null, database, filterName, new 
String[] {"TABLE"})) {
             while (resultSet.next()) {
                 String tableName = resultSet.getString("TABLE_NAME");
                 if (StringUtils.isNotBlank(tableName)) {
@@ -90,11 +91,30 @@ public class Db2JdbcDataSourceChannel implements 
DataSourceChannel {
     @Override
     public List<String> getDatabases(
             @NonNull String pluginName, @NonNull Map<String, String> 
requestParams) {
-        // Hardcoded list of example database names
-        List<String> dbNames = Arrays.asList("default");
+        List<String> dbNames = new ArrayList<>();
+        try (Connection connection = getConnection(requestParams);
+                Statement statement = connection.createStatement()) {
+            ResultSet resultSet = statement.executeQuery("SELECT SCHEMANAME 
FROM SYSCAT.SCHEMATA");
+            while (resultSet.next()) {
+                String dbName = resultSet.getString("SCHEMANAME");
+                if (StringUtils.isBlank(dbName)) {
+                    continue;
+                }
+                dbName = dbName.trim();
+                if (isNotSystemDatabase(dbName)) {
+                    dbNames.add(dbName);
+                }
+            }
+        } catch (SQLException | ClassNotFoundException e) {
+            throw new DataSourcePluginException("Failed to get database 
names", e);
+        }
         return dbNames;
     }
 
+    private boolean isNotSystemDatabase(String dbName) {
+        return 
!Db2DataSourceConfig.DB2_SYSTEM_DATABASES.contains(dbName.toUpperCase());
+    }
+
     @Override
     public boolean checkDataSourceConnectivity(
             @NonNull String pluginName, @NonNull Map<String, String> 
requestParams) {
@@ -119,7 +139,7 @@ public class Db2JdbcDataSourceChannel implements 
DataSourceChannel {
             String primaryKey = getPrimaryKey(metaData, database, table);
 
             // Retrieve column information
-            try (ResultSet resultSet = metaData.getColumns(null, null, table, 
null)) {
+            try (ResultSet resultSet = metaData.getColumns(null, database, 
table, null)) {
 
                 while (resultSet.next()) {
                     TableField tableField = new TableField();
@@ -140,14 +160,9 @@ public class Db2JdbcDataSourceChannel implements 
DataSourceChannel {
                     tableFields.add(tableField);
                 }
             }
-        } catch (SQLException e) {
-            // Log the exception and rethrow as DataSourcePluginException
-            System.out.println("Error while retrieving table fields: " + e);
+        } catch (SQLException | ClassNotFoundException e) {
+            log.error("Error while retrieving table fields", e);
             throw new DataSourcePluginException("Failed to get table fields", 
e);
-        } catch (ClassNotFoundException e) {
-            // Log the exception and rethrow as DataSourcePluginException
-            System.out.println("JDBC driver class not found" + e);
-            throw new DataSourcePluginException("JDBC driver class not found", 
e);
         }
         return tableFields;
     }
@@ -170,7 +185,7 @@ public class Db2JdbcDataSourceChannel implements 
DataSourceChannel {
     private String getPrimaryKey(DatabaseMetaData metaData, String dbName, 
String tableName)
             throws SQLException {
         ResultSet primaryKeysInfo = metaData.getPrimaryKeys(null, dbName, 
tableName);
-        while (primaryKeysInfo.next()) {
+        if (primaryKeysInfo.next()) {
             return primaryKeysInfo.getString("COLUMN_NAME");
         }
         return null;
@@ -178,8 +193,15 @@ public class Db2JdbcDataSourceChannel implements 
DataSourceChannel {
 
     private Connection getConnection(Map<String, String> requestParams)
             throws SQLException, ClassNotFoundException {
-        // Ensure the DB2 JDBC driver is loaded
-        Class.forName("com.ibm.db2.jcc.DB2Driver");
+        String driverClass =
+                requestParams.getOrDefault(
+                        Db2OptionRule.DRIVER.key(),
+                        Db2OptionRule.DriverType.DB2.getDriverClassName());
+        try {
+            Class.forName(driverClass);
+        } catch (ClassNotFoundException e) {
+            throw new DataSourcePluginException("DB2 jdbc driver " + 
driverClass + " not found", e);
+        }
         checkNotNull(requestParams.get(Db2OptionRule.URL.key()), "Jdbc url 
cannot be null");
         String url = requestParams.get(Db2OptionRule.URL.key());
         if (requestParams.containsKey(Db2OptionRule.USER.key())) {
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java
index e5e2b9b1..4947dffd 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-db2/src/main/java/org/apache/seatunnel/datasource/plugin/db2/jdbc/Db2OptionRule.java
@@ -26,9 +26,7 @@ public class Db2OptionRule {
             Options.key("url")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription(
-                            "jdbc url, eg:"
-                                    + " 
jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8");
+                    .withDescription("jdbc url, eg:" + " 
jdbc:db2://localhost:50000/databaseName");
 
     public static final Option<String> USER =
             
Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
index f7fae731..8d56224f 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/bean/engine/EngineDataType.java
@@ -69,7 +69,11 @@ public class EngineDataType {
         T_INT_ARRAY("array<int>", ArrayType.INT_ARRAY_TYPE),
         T_LONG_ARRAY("array<bigint>", ArrayType.LONG_ARRAY_TYPE),
         T_FLOAT_ARRAY("array<float>", ArrayType.FLOAT_ARRAY_TYPE),
-        T_DOUBLE_ARRAY("array<double>", ArrayType.DOUBLE_ARRAY_TYPE);
+        T_DOUBLE_ARRAY("array<double>", ArrayType.DOUBLE_ARRAY_TYPE),
+        T_VARCHAR("varchar", BasicType.STRING_TYPE),
+        T_CHAR("char", BasicType.STRING_TYPE),
+        T_INTEGER("integer", BasicType.INT_TYPE),
+        T_DECIMAL_DEFAULT("decimal", BasicType.DOUBLE_TYPE);
 
         @Getter private final String name;
         @Getter private final SeaTunnelDataType<?> RawType;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java
index 327f9829..022148d6 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/thirdparty/datasource/impl/Db2DataSourceConfigSwitcher.java
@@ -199,7 +199,7 @@ public class Db2DataSourceConfigSwitcher extends 
BaseJdbcDataSourceConfigSwitche
                 sb.append(", ");
             }
         }
-        sb.append(" FROM ").append(quoteIdentifier(table));
+        sb.append(" FROM ").append(quoteIdentifier(database) + "." + 
quoteIdentifier(table));
 
         return sb.toString();
     }
@@ -211,7 +211,9 @@ public class Db2DataSourceConfigSwitcher extends 
BaseJdbcDataSourceConfigSwitche
     protected String generateSinkSql(
             List<String> tableFields, String database, String schema, String 
table) {
         StringBuilder sb = new StringBuilder();
-        sb.append("INSERT INTO ").append(quoteIdentifier(table)).append(" (");
+        sb.append("INSERT INTO ")
+                .append(quoteIdentifier(database) + "." + 
quoteIdentifier(table))
+                .append(" (");
 
         // Append column names
         for (int i = 0; i < tableFields.size(); i++) {

Reply via email to