This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/seatunnel-web.git
The following commit(s) were added to refs/heads/main by this push:
new 76ae1d74 [feature] channel update get tables and add filter by name
and size (#105)
76ae1d74 is described below
commit 76ae1d747f39d6e5464131564889b14d2f26fd8e
Author: XiaoJiang521 <[email protected]>
AuthorDate: Wed Aug 23 11:45:05 2023 +0800
[feature] channel update get tables and add filter by name and size (#105)
---
.../mysql/jdbc/MysqlJdbcDataSourceChannel.java | 15 +++-
.../oracle/jdbc/OracleDataSourceChannel.java | 87 ++++++++++++++++++----
.../jdbc/PostgresqlDataSourceChannel.java | 56 ++++++++++++--
.../cdc/mysql/MysqlCDCDataSourceChannel.java | 20 ++++-
.../controller/SeatunnelDatasourceController.java | 7 +-
5 files changed, 154 insertions(+), 31 deletions(-)
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
index fe7842f6..78a7f62e 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
@@ -58,17 +58,28 @@ public class MysqlJdbcDataSourceChannel implements
DataSourceChannel {
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
- Map<String, String> option) {
+ Map<String, String> options) {
List<String> tableNames = new ArrayList<>();
+ String filterName = options.get("filterName");
+ String size = options.get("size");
+ boolean isSize = StringUtils.isNotEmpty(size);
+ if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
+ filterName = "%" + filterName + "%";
+ } else if (StringUtils.equals(filterName, "")) {
+ filterName = null;
+ }
try (Connection connection = getConnection(requestParams);
ResultSet resultSet =
connection
.getMetaData()
- .getTables(database, null, null, new String[]
{"TABLE"})) {
+ .getTables(database, null, filterName, new
String[] {"TABLE"})) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
tableNames.add(tableName);
+ if (isSize && tableNames.size() >= Integer.parseInt(size))
{
+ break;
+ }
}
}
return tableNames;
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
index 6844cd3a..775decb1 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -33,12 +34,15 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
+@Slf4j
public class OracleDataSourceChannel implements DataSourceChannel {
@Override
@@ -56,18 +60,72 @@ public class OracleDataSourceChannel implements
DataSourceChannel {
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
- Map<String, String> option) {
+ Map<String, String> options) {
+ StringBuilder sqlWhere = new StringBuilder();
+ final String sql =
+ "SELECT * FROM ( SELECT OWNER, TABLE_NAME FROM ALL_TABLES\n"
+ + "WHERE TABLE_NAME NOT LIKE 'MDRT_%'\n"
+ + " AND TABLE_NAME NOT LIKE 'MDRS_%'\n"
+ + " AND TABLE_NAME NOT LIKE 'MDXT_%'\n"
+ + " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND
IOT_NAME IS NULL)"
+ + "AND OWNER NOT IN ('APPQOSSYS', 'AUDSYS', 'CTXSYS',
'DVSYS', 'DBSFWUSER', 'DBSNMP',\n"
+ + " 'GSMADMIN_INTERNAL', 'LBACSYS',
'MDSYS', 'OJVMSYS', 'OLAPSYS',\n"
+ + " 'ORDDATA', 'ORDSYS', 'OUTLN',
'SYS', 'SYSTEM', 'WMSYS',\n"
+ + " 'XDB', 'EXFSYS', 'SYSMAN')";
+ sqlWhere.append(sql);
+ String filterName = options.get("filterName");
+ if (StringUtils.isNotEmpty(filterName)) {
+ String[] split = filterName.split("\\.");
+ if (split.length == 2) {
+ sqlWhere.append(" AND (TABLE_NAME LIKE '")
+ .append(
+ split[1].contains("%")
+ ? split[1].toUpperCase(Locale.ROOT)
+ : "%" +
split[1].toUpperCase(Locale.ROOT) + "%")
+ .append("'")
+ .append(" AND OWNER LIKE '")
+ .append(
+ split[0].contains("%")
+ ? split[0].toUpperCase(Locale.ROOT)
+ : "%" +
split[0].toUpperCase(Locale.ROOT) + "%")
+ .append("')");
+ } else {
+ String filterNameRep =
+ filterName.contains("%")
+ ? filterName.toUpperCase(Locale.ROOT)
+ : "%" + filterName.toUpperCase(Locale.ROOT) +
"%";
+ sqlWhere.append(" AND (TABLE_NAME LIKE '%")
+ .append(filterNameRep)
+ .append("%'")
+ .append(" OR OWNER LIKE '%")
+ .append(filterNameRep)
+ .append("%')");
+ }
+ }
+ sqlWhere.append(" ORDER BY OWNER, TABLE_NAME ) ");
+ String size = options.get("size");
+ if (StringUtils.isNotEmpty(size)) {
+ sqlWhere.append("WHERE ROWNUM <= ").append(size);
+ }
+ log.info("execute sql :{}", sqlWhere.toString());
List<String> tableNames = new ArrayList<>();
- try (Connection connection = getConnection(requestParams);
- ResultSet resultSet =
- connection
- .getMetaData()
- .getTables(database, null, null, new String[]
{"TABLE"}); ) {
- while (resultSet.next()) {
- String tableName = resultSet.getString("TABLE_NAME");
- if (StringUtils.isNotBlank(tableName)) {
- tableNames.add(tableName);
+ long start = System.currentTimeMillis();
+ try (Connection connection = getConnection(requestParams); ) {
+ long end = System.currentTimeMillis();
+ log.info("connection, cost {}ms for oracle", end - start);
+ start = System.currentTimeMillis();
+ try (Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(sqlWhere.toString())) {
+ end = System.currentTimeMillis();
+ log.info("statement execute sql, cost {}ms for oracle", end -
start);
+ start = System.currentTimeMillis();
+ while (resultSet.next()) {
+ String schemaName = resultSet.getString("OWNER");
+ String tableName = resultSet.getString("TABLE_NAME");
+ tableNames.add(schemaName + "." + tableName);
}
+ end = System.currentTimeMillis();
+ log.info("while result set, cost {}ms for oracle", end -
start);
}
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
@@ -80,13 +138,12 @@ public class OracleDataSourceChannel implements
DataSourceChannel {
@NonNull String pluginName, @NonNull Map<String, String>
requestParams) {
List<String> dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
- PreparedStatement statement =
connection.prepareStatement("SHOW DATABASES;");
+ PreparedStatement statement =
+ connection.prepareStatement("SELECT NAME FROM
v$database");
ResultSet re = statement.executeQuery()) {
- // filter system databases
while (re.next()) {
- String dbName = re.getString("database");
- if (StringUtils.isNotBlank(dbName)
- &&
!OracleDataSourceConfig.ORACLE_SYSTEM_DATABASES.contains(dbName)) {
+ String dbName = re.getString("NAME");
+ if (StringUtils.isNotBlank(dbName)) {
dbNames.add(dbName);
}
}
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
index 9b2e9abf..b21bf993 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
@@ -26,6 +26,7 @@ import
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -40,6 +41,7 @@ import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
+@Slf4j
public class PostgresqlDataSourceChannel implements DataSourceChannel {
@Override
@@ -57,18 +59,52 @@ public class PostgresqlDataSourceChannel implements
DataSourceChannel {
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
- Map<String, String> option) {
+ Map<String, String> options) {
List<String> tableNames = new ArrayList<>();
- String query = "SELECT table_schema, table_name FROM
information_schema.tables";
- try (Connection connection = getConnection(requestParams, database)) {
+ StringBuilder queryWhere = new StringBuilder();
+ String query =
+ "SELECT table_schema, table_name FROM
information_schema.tables\n"
+ + "WHERE table_schema NOT IN ('information_schema',
'pg_catalog', 'root', 'pg_toast', 'pg_temp_1', 'pg_toast_temp_1', 'postgres',
'template0', 'template1')\n";
+ queryWhere.append(query);
+ String filterName = options.get("filterName");
+ if (StringUtils.isNotEmpty(filterName)) {
+ String[] split = filterName.split("\\.");
+ if (split.length == 2) {
+ queryWhere
+ .append("AND (table_schema LIKE '")
+ .append(split[0].contains("%") ? split[0] : "%" +
split[0] + "%")
+ .append("'")
+ .append(" AND table_name LIKE '")
+ .append(split[1].contains("%") ? split[1] : "%" +
split[1] + "%")
+ .append("')");
+ } else {
+ String filterNameRep =
+ filterName.contains("%") ? filterName : "%" +
filterName + "%";
+ queryWhere
+ .append(" AND (table_schema LIKE '")
+ .append(filterNameRep)
+ .append("'")
+ .append(" OR table_name LIKE '")
+ .append(filterNameRep)
+ .append("')");
+ }
+ }
+ String size = options.get("size");
+ if (StringUtils.isNotEmpty(size)) {
+ queryWhere.append(" LIMIT ").append(size);
+ }
+ log.info(queryWhere.toString());
+ requestParams.put(
+ PostgresqlOptionRule.URL.key(),
+ JdbcUtils.replaceDatabase(
+ requestParams.get(PostgresqlOptionRule.URL.key()),
database));
+ try (Connection connection = getConnection(requestParams)) {
try (Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(query)) {
+ ResultSet resultSet =
statement.executeQuery(queryWhere.toString())) {
while (resultSet.next()) {
String schemaName = resultSet.getString("table_schema");
String tableName = resultSet.getString("table_name");
- if (StringUtils.isNotBlank(schemaName)
- &&
!PostgresqlDataSourceConfig.POSTGRESQL_SYSTEM_DATABASES.contains(
- schemaName)) {
+ if (StringUtils.isNotBlank(schemaName)) {
tableNames.add(schemaName + "." + tableName);
}
}
@@ -118,7 +154,11 @@ public class PostgresqlDataSourceChannel implements
DataSourceChannel {
@NonNull String database,
@NonNull String table) {
List<TableField> tableFields = new ArrayList<>();
- try (Connection connection = getConnection(requestParams, database); )
{
+ requestParams.put(
+ PostgresqlOptionRule.URL.key(),
+ JdbcUtils.replaceDatabase(
+ requestParams.get(PostgresqlOptionRule.URL.key()),
database));
+ try (Connection connection = getConnection(requestParams)) {
DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, database, table);
String[] split = table.split("\\.");
diff --git
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
index 4e5164fc..1cd99d36 100644
---
a/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
+++
b/seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
@@ -65,8 +65,8 @@ public class MysqlCDCDataSourceChannel implements
DataSourceChannel {
String pluginName,
Map<String, String> requestParams,
String database,
- Map<String, String> option) {
- return this.getTableNames(requestParams, database);
+ Map<String, String> options) {
+ return this.getTableNames(requestParams, database, options);
}
@Override
@@ -179,17 +179,29 @@ public class MysqlCDCDataSourceChannel implements
DataSourceChannel {
}
}
- protected List<String> getTableNames(Map<String, String> requestParams,
String dbName) {
+ protected List<String> getTableNames(
+ Map<String, String> requestParams, String dbName, Map<String,
String> options) {
List<String> tableNames = new ArrayList<>();
+ String filterName = options.get("filterName");
+ String size = options.get("size");
+ boolean isSize = StringUtils.isNotEmpty(size);
+ if (StringUtils.isNotEmpty(filterName) && !filterName.contains("%")) {
+ filterName = "%" + filterName + "%";
+ } else if (StringUtils.equals(filterName, "")) {
+ filterName = null;
+ }
try (Connection connection = init(requestParams);
ResultSet resultSet =
connection
.getMetaData()
- .getTables(dbName, null, null, new String[]
{"TABLE"})) {
+ .getTables(dbName, null, filterName, new
String[] {"TABLE"})) {
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) {
tableNames.add(tableName);
+ if (isSize && tableNames.size() >= Integer.parseInt(size))
{
+ break;
+ }
}
}
return tableNames;
diff --git
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
index ac892996..85aa5849 100644
---
a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
+++
b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/controller/SeatunnelDatasourceController.java
@@ -354,8 +354,11 @@ public class SeatunnelDatasourceController extends
BaseController {
Result<List<String>> getTableNames(
@ApiIgnore @RequestAttribute(value = SESSION_USER) User loginUser,
@RequestParam("datasourceName") String datasourceName,
- @RequestParam("databaseName") String databaseName) {
- return
Result.success(datasourceService.queryTableNames(datasourceName, databaseName));
+ @RequestParam("databaseName") String databaseName,
+ @RequestParam("filterName") String filterName,
+ @RequestParam("size") Integer size) {
+ return Result.success(
+ datasourceService.queryTableNames(datasourceName,
databaseName, filterName, size));
}
@GetMapping("/schema")