This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git


The following commit(s) were added to refs/heads/main by this push:
     new 76ae1d74 [feature] channel update get tables and add filter by name 
and size (#105)
76ae1d74 is described below

commit 76ae1d747f39d6e5464131564889b14d2f26fd8e
Author: XiaoJiang521 <[email protected]>
AuthorDate: Wed Aug 23 11:45:05 2023 +0800

    [feature] channel update get tables and add filter by name and size (#105)
---
 .../mysql/jdbc/MysqlJdbcDataSourceChannel.java     | 15 +++-
 .../oracle/jdbc/OracleDataSourceChannel.java       | 87 ++++++++++++++++++----
 .../jdbc/PostgresqlDataSourceChannel.java          | 56 ++++++++++++--
 .../cdc/mysql/MysqlCDCDataSourceChannel.java       | 20 ++++-
 .../controller/SeatunnelDatasourceController.java  |  7 +-
 5 files changed, 154 insertions(+), 31 deletions(-)

diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
index fe7842f6..78a7f62e 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
@@ -58,17 +58,28 @@ public class MysqlJdbcDataSourceChannel implements 
DataSourceChannel {
             @NonNull String pluginName,
             Map<String, String> requestParams,
             String database,
-            Map<String, String> option) {
+            Map<String, String> options) {
         List<String> tableNames = new ArrayList<>();
+        String filterName = options.get("filterName");
+        String size = options.get("size");
+        boolean isSize = StringUtils.isNotEmpty(size);
+        if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
+            filterName = "%" + filterName + "%";
+        } else if (StringUtils.equals(filterName, "")) {
+            filterName = null;
+        }
         try (Connection connection = getConnection(requestParams);
                 ResultSet resultSet =
                         connection
                                 .getMetaData()
-                                .getTables(database, null, null, new String[] 
{"TABLE"})) {
+                                .getTables(database, null, filterName, new 
String[] {"TABLE"})) {
             while (resultSet.next()) {
                 String tableName = resultSet.getString("TABLE_NAME");
                 if (StringUtils.isNotBlank(tableName)) {
                     tableNames.add(tableName);
+                    if (isSize && tableNames.size() >= Integer.parseInt(size)) 
{
+                        break;
+                    }
                 }
             }
             return tableNames;
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
index 6844cd3a..775decb1 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -33,12 +34,15 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+@Slf4j
 public class OracleDataSourceChannel implements DataSourceChannel {
 
     @Override
@@ -56,18 +60,72 @@ public class OracleDataSourceChannel implements 
DataSourceChannel {
             @NonNull String pluginName,
             Map<String, String> requestParams,
             String database,
-            Map<String, String> option) {
+            Map<String, String> options) {
+        StringBuilder sqlWhere = new StringBuilder();
+        final String sql =
+                "SELECT * FROM ( SELECT OWNER, TABLE_NAME FROM ALL_TABLES\n"
+                        + "WHERE TABLE_NAME NOT LIKE 'MDRT_%'\n"
+                        + "  AND TABLE_NAME NOT LIKE 'MDRS_%'\n"
+                        + "  AND TABLE_NAME NOT LIKE 'MDXT_%'\n"
+                        + "  AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND 
IOT_NAME IS NULL)"
+                        + "AND OWNER NOT IN ('APPQOSSYS', 'AUDSYS', 'CTXSYS', 
'DVSYS', 'DBSFWUSER', 'DBSNMP',\n"
+                        + "                    'GSMADMIN_INTERNAL', 'LBACSYS', 
'MDSYS', 'OJVMSYS', 'OLAPSYS',\n"
+                        + "                    'ORDDATA', 'ORDSYS', 'OUTLN', 
'SYS', 'SYSTEM', 'WMSYS',\n"
+                        + "                    'XDB', 'EXFSYS', 'SYSMAN')";
+        sqlWhere.append(sql);
+        String filterName = options.get("filterName");
+        if (StringUtils.isNotEmpty(filterName)) {
+            String[] split = filterName.split("\\.");
+            if (split.length == 2) {
+                sqlWhere.append(" AND (TABLE_NAME LIKE '")
+                        .append(
+                                split[1].contains("%")
+                                        ? split[1].toUpperCase(Locale.ROOT)
+                                        : "%" + 
split[1].toUpperCase(Locale.ROOT) + "%")
+                        .append("'")
+                        .append(" AND OWNER LIKE '")
+                        .append(
+                                split[0].contains("%")
+                                        ? split[0].toUpperCase(Locale.ROOT)
+                                        : "%" + 
split[0].toUpperCase(Locale.ROOT) + "%")
+                        .append("')");
+            } else {
+                String filterNameRep =
+                        filterName.contains("%")
+                                ? filterName.toUpperCase(Locale.ROOT)
+                                : "%" + filterName.toUpperCase(Locale.ROOT) + 
"%";
+                sqlWhere.append(" AND (TABLE_NAME LIKE '%")
+                        .append(filterNameRep)
+                        .append("%'")
+                        .append(" OR OWNER LIKE '%")
+                        .append(filterNameRep)
+                        .append("%')");
+            }
+        }
+        sqlWhere.append(" ORDER BY OWNER, TABLE_NAME ) ");
+        String size = options.get("size");
+        if (StringUtils.isNotEmpty(size)) {
+            sqlWhere.append("WHERE ROWNUM <= ").append(size);
+        }
+        log.info("execute sql :{}", sqlWhere.toString());
         List<String> tableNames = new ArrayList<>();
-        try (Connection connection = getConnection(requestParams);
-                ResultSet resultSet =
-                        connection
-                                .getMetaData()
-                                .getTables(database, null, null, new String[] 
{"TABLE"}); ) {
-            while (resultSet.next()) {
-                String tableName = resultSet.getString("TABLE_NAME");
-                if (StringUtils.isNotBlank(tableName)) {
-                    tableNames.add(tableName);
+        long start = System.currentTimeMillis();
+        try (Connection connection = getConnection(requestParams); ) {
+            long end = System.currentTimeMillis();
+            log.info("connection, cost {}ms for oracle", end - start);
+            start = System.currentTimeMillis();
+            try (Statement statement = connection.createStatement();
+                    ResultSet resultSet = 
statement.executeQuery(sqlWhere.toString())) {
+                end = System.currentTimeMillis();
+                log.info("statement execute sql, cost {}ms for oracle", end - 
start);
+                start = System.currentTimeMillis();
+                while (resultSet.next()) {
+                    String schemaName = resultSet.getString("OWNER");
+                    String tableName = resultSet.getString("TABLE_NAME");
+                    tableNames.add(schemaName + "." + tableName);
                 }
+                end = System.currentTimeMillis();
+                log.info("while result set, cost {}ms for oracle", end - 
start);
             }
             return tableNames;
         } catch (ClassNotFoundException | SQLException e) {
@@ -80,13 +138,12 @@ public class OracleDataSourceChannel implements 
DataSourceChannel {
             @NonNull String pluginName, @NonNull Map<String, String> 
requestParams) {
         List<String> dbNames = new ArrayList<>();
         try (Connection connection = getConnection(requestParams);
-                PreparedStatement statement = 
connection.prepareStatement("SHOW DATABASES;");
+                PreparedStatement statement =
+                        connection.prepareStatement("SELECT NAME FROM 
v$database");
                 ResultSet re = statement.executeQuery()) {
-            // filter system databases
             while (re.next()) {
-                String dbName = re.getString("database");
-                if (StringUtils.isNotBlank(dbName)
-                        && 
!OracleDataSourceConfig.ORACLE_SYSTEM_DATABASES.contains(dbName)) {
+                String dbName = re.getString("NAME");
+                if (StringUtils.isNotBlank(dbName)) {
                     dbNames.add(dbName);
                 }
             }
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
index 9b2e9abf..b21bf993 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
@@ -26,6 +26,7 @@ import 
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -40,6 +41,7 @@ import java.util.Map;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+@Slf4j
 public class PostgresqlDataSourceChannel implements DataSourceChannel {
 
     @Override
@@ -57,18 +59,52 @@ public class PostgresqlDataSourceChannel implements 
DataSourceChannel {
             @NonNull String pluginName,
             Map<String, String> requestParams,
             String database,
-            Map<String, String> option) {
+            Map<String, String> options) {
         List<String> tableNames = new ArrayList<>();
-        String query = "SELECT table_schema, table_name FROM 
information_schema.tables";
-        try (Connection connection = getConnection(requestParams, database)) {
+        StringBuilder queryWhere = new StringBuilder();
+        String query =
+                "SELECT table_schema, table_name FROM 
information_schema.tables\n"
+                        + "WHERE table_schema NOT IN ('information_schema', 
'pg_catalog', 'root', 'pg_toast', 'pg_temp_1', 'pg_toast_temp_1', 'postgres', 
'template0', 'template1')\n";
+        queryWhere.append(query);
+        String filterName = options.get("filterName");
+        if (StringUtils.isNotEmpty(filterName)) {
+            String[] split = filterName.split("\\.");
+            if (split.length == 2) {
+                queryWhere
+                        .append("AND (table_schema LIKE '")
+                        .append(split[0].contains("%") ? split[0] : "%" + 
split[0] + "%")
+                        .append("'")
+                        .append(" AND table_name LIKE '")
+                        .append(split[1].contains("%") ? split[1] : "%" + 
split[1] + "%")
+                        .append("')");
+            } else {
+                String filterNameRep =
+                        filterName.contains("%") ? filterName : "%" + 
filterName + "%";
+                queryWhere
+                        .append(" AND (table_schema LIKE '")
+                        .append(filterNameRep)
+                        .append("'")
+                        .append(" OR table_name LIKE '")
+                        .append(filterNameRep)
+                        .append("')");
+            }
+        }
+        String size = options.get("size");
+        if (StringUtils.isNotEmpty(size)) {
+            queryWhere.append(" LIMIT ").append(size);
+        }
+        log.info(queryWhere.toString());
+        requestParams.put(
+                PostgresqlOptionRule.URL.key(),
+                JdbcUtils.replaceDatabase(
+                        requestParams.get(PostgresqlOptionRule.URL.key()), 
database));
+        try (Connection connection = getConnection(requestParams)) {
             try (Statement statement = connection.createStatement();
-                    ResultSet resultSet = statement.executeQuery(query)) {
+                    ResultSet resultSet = 
statement.executeQuery(queryWhere.toString())) {
                 while (resultSet.next()) {
                     String schemaName = resultSet.getString("table_schema");
                     String tableName = resultSet.getString("table_name");
-                    if (StringUtils.isNotBlank(schemaName)
-                            && 
!PostgresqlDataSourceConfig.POSTGRESQL_SYSTEM_DATABASES.contains(
-                                    schemaName)) {
+                    if (StringUtils.isNotBlank(schemaName)) {
                         tableNames.add(schemaName + "." + tableName);
                     }
                 }
@@ -118,7 +154,11 @@ public class PostgresqlDataSourceChannel implements 
DataSourceChannel {
             @NonNull String database,
             @NonNull String table) {
         List<TableField> tableFields = new ArrayList<>();
-        try (Connection connection = getConnection(requestParams, database); ) 
{
+        requestParams.put(
+                PostgresqlOptionRule.URL.key(),
+                JdbcUtils.replaceDatabase(
+                        requestParams.get(PostgresqlOptionRule.URL.key()), 
database));
+        try (Connection connection = getConnection(requestParams)) {
             DatabaseMetaData metaData = connection.getMetaData();
             String primaryKey = getPrimaryKey(metaData, database, table);
             String[] split = table.split("\\.");
diff --git 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
index 4e5164fc..1cd99d36 100644
--- 
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
+++ 
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
@@ -65,8 +65,8 @@ public class MysqlCDCDataSourceChannel implements 
DataSourceChannel {
             String pluginName,
             Map<String, String> requestParams,
             String database,
-            Map<String, String> option) {
-        return this.getTableNames(requestParams, database);
+            Map<String, String> options) {
+        return this.getTableNames(requestParams, database, options);
     }
 
     @Override
@@ -179,17 +179,29 @@ public class MysqlCDCDataSourceChannel implements 
DataSourceChannel {
         }
     }
 
-    protected List<String> getTableNames(Map<String, String> requestParams, 
String dbName) {
+    protected List<String> getTableNames(
+            Map<String, String> requestParams, String dbName, Map<String, 
String> options) {
         List<String> tableNames = new ArrayList<>();
+        String filterName = options.get("filterName");
+        String size = options.get("size");
+        boolean isSize = StringUtils.isNotEmpty(size);
+        if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
+            filterName = "%" + filterName + "%";
+        } else if (StringUtils.equals(filterName, "")) {
+            filterName = null;
+        }
         try (Connection connection = init(requestParams);
                 ResultSet resultSet =
                         connection
                                 .getMetaData()
-                                .getTables(dbName, null, null, new String[] 
{"TABLE"})) {
+                                .getTables(dbName, null, filterName, new 
String[] {"TABLE"})) {
             while (resultSet.next()) {
                 String tableName = resultSet.getString("TABLE_NAME");
                 if (StringUtils.isNotBlank(tableName)) {
                     tableNames.add(tableName);
+                    if (isSize && tableNames.size() >= Integer.parseInt(size)) 
{
+                        break;
+                    }
                 }
             }
             return tableNames;
diff --git 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
index ac892996..85aa5849 100644
--- 
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
+++ 
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
@@ -354,8 +354,11 @@ public class SeatunnelDatasourceController extends 
BaseController {
     Result<List<String>> getTableNames(
             @ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
             @RequestParam("datasourceName") String datasourceName,
-            @RequestParam("databaseName") String databaseName) {
-        return 
Result.success(datasourceService.queryTableNames(datasourceName, databaseName));
+            @RequestParam("databaseName") String databaseName,
+            @RequestParam("filterName") String filterName,
+            @RequestParam("size") Integer size) {
+        return Result.success(
+                datasourceService.queryTableNames(datasourceName, 
databaseName, filterName, size));
     }
 
     @GetMapping("/schema")

Reply via email to