This is an automated email from the ASF dual-hosted git repository. liugddx pushed a commit to branch revert-4628-revert-4540-refactor-catalog in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 063c6e4cdf57dbdee6f5d0ba7a1e823cbe814ccf Author: Guangdong Liu <[email protected]> AuthorDate: Thu Apr 20 15:07:29 2023 +0800 Revert "Revert "[Improve][Catalog] refactor catalog (#4540)" (#4628)" This reverts commit 2d1933195d39271134bff04bd68e5723ba627745. --- .../jdbc/catalog/AbstractJdbcCatalog.java | 234 ++++++++++++++++++++- .../seatunnel/jdbc/catalog/mysql/MySqlCatalog.java | 219 +------------------ .../catalog/mysql/MysqlCreateTableSqlBuilder.java | 2 +- .../jdbc/catalog/sqlserver/SqlServerCatalog.java | 228 +------------------- .../jdbc/catalog/sqlserver/SqlServerType.java | 2 +- .../jdbc/internal/dialect/JdbcDialect.java | 38 ++++ .../jdbc/internal/dialect/mysql/MysqlDialect.java | 8 + .../dialect/sqlserver/SqlServerDialect.java | 30 +++ .../connector-starrocks/pom.xml | 5 + .../starrocks/catalog/StarRocksCatalog.java | 177 +++------------- .../starrocks/catalog/StarRocksDialect.java | 27 +++ 11 files changed, 365 insertions(+), 605 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index bd516e325..4fb68775f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -21,14 +21,20 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -40,14 +46,19 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; @@ -55,7 +66,6 @@ import static com.google.common.base.Preconditions.checkNotNull; public abstract class AbstractJdbcCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class); - protected final String catalogName; protected final String defaultDatabase; protected final String username; @@ -63,6 +73,8 @@ public abstract class AbstractJdbcCatalog implements Catalog { protected final String baseUrl; protected final String suffix; protected final String defaultUrl; + protected final JdbcDialect jdbcDialect; + protected static final Set<String> SYS_DATABASES = new HashSet<>(); public AbstractJdbcCatalog( String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) { @@ -80,6 +92,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/"; this.defaultUrl = urlInfo.getOrigin(); this.suffix = urlInfo.getSuffix(); + this.jdbcDialect = JdbcDialectLoader.load(this.baseUrl); } @Override @@ -107,6 +120,7 @@ public abstract class AbstractJdbcCatalog implements Catalog { public void open() throws CatalogException { try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { // test connection, fail early if we cannot connect to database + conn.getCatalog(); } catch (SQLException e) { throw new CatalogException( String.format("Failed connecting to %s via JDBC.", defaultUrl), e); @@ -120,6 +134,56 @@ public abstract class AbstractJdbcCatalog implements Catalog { LOG.info("Catalog {} closing", catalogName); } + @Override + public List<String> listDatabases() throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { + + PreparedStatement ps = conn.prepareStatement(jdbcDialect.listDatabases()); + + List<String> databases = new ArrayList<>(); + ResultSet rs = ps.executeQuery(); + + while (rs.next()) { + String databaseName = rs.getString(1); + if (!getSysDatabases().contains(databaseName)) { + databases.add(rs.getString(1)); + } + } + + return databases; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", this.catalogName), e); + } + } + + @Override + public List<String> listTables(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + + String dbUrl = jdbcDialect.getUrlFromDatabaseName(baseUrl, databaseName, suffix); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement(jdbcDialect.listTableSql(databaseName))) { + + ResultSet rs = ps.executeQuery(); + + List<String> tables = new ArrayList<>(); + + while (rs.next()) { + tables.add(jdbcDialect.getTableName(rs)); + } + + return tables; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + protected Optional<PrimaryKey> getPrimaryKey( DatabaseMetaData metaData, String database, String table) throws SQLException { return getPrimaryKey(metaData, database, table, table); @@ -226,7 +290,8 @@ public abstract class AbstractJdbcCatalog implements Catalog { public boolean tableExists(TablePath tablePath) throws CatalogException { try { return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName()); + && listTables(tablePath.getDatabaseName()) + .contains(jdbcDialect.getTableName(tablePath)); } catch (DatabaseNotExistException e) { return false; } @@ -245,8 +310,86 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } - protected abstract boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException; + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + + String dbUrl = + jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { + DatabaseMetaData metaData = conn.getMetaData(); + Optional<PrimaryKey> primaryKey = + getPrimaryKey( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + List<ConstraintKey> constraintKeys = + getConstraintKeys( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + + try (PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT * FROM %s WHERE 1 = 0;", + tablePath.getFullNameWithQuoted("\"")))) { + ResultSetMetaData tableMetaData = ps.getMetaData(); + TableSchema.Builder builder = TableSchema.builder(); + // add column + for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { + String columnName = tableMetaData.getColumnName(i); + SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i); + int columnDisplaySize = tableMetaData.getColumnDisplaySize(i); + String comment = tableMetaData.getColumnLabel(i); + boolean isNullable = + tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable; + Object defaultValue = + getColumnDefaultValue( + metaData, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName(), + columnName) + .orElse(null); + + PhysicalColumn physicalColumn = + PhysicalColumn.of( + columnName, + type, + columnDisplaySize, + isNullable, + defaultValue, + comment); + builder.column(physicalColumn); + } + // add primary key + primaryKey.ifPresent(builder::primaryKey); + // add constraint key + constraintKeys.forEach(builder::constraintKey); + TableIdentifier tableIdentifier = + TableIdentifier.of( + catalogName, + tablePath.getDatabaseName(), + tablePath.getSchemaName(), + tablePath.getTableName()); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + ""); + } + + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } @Override public void dropTable(TablePath tablePath, boolean ignoreIfNotExists) @@ -257,7 +400,20 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } - protected abstract boolean dropTableInternal(TablePath tablePath) throws CatalogException; + protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { + String dbUrl = + jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + jdbcDialect.getDropTableSql(tablePath.getFullName()))) { + // Will there exist concurrent drop for one table? + return ps.execute(); + } catch (SQLException e) { + throw new CatalogException( + String.format("Failed dropping table %s", tablePath.getFullName()), e); + } + } @Override public void createDatabase(TablePath tablePath, boolean ignoreIfExists) @@ -273,8 +429,6 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } - protected abstract boolean createDatabaseInternal(String databaseName); - @Override public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException { @@ -286,5 +440,69 @@ public abstract class AbstractJdbcCatalog implements Catalog { } } - protected abstract boolean dropDatabaseInternal(String databaseName) throws CatalogException; + protected SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) + throws SQLException { + return null; + } + + protected Set<String> getSysDatabases() { + return SYS_DATABASES; + } + + protected Map<String, String> buildConnectorOptions(TablePath tablePath) { + Map<String, String> options = new HashMap<>(8); + options.put("connector", "jdbc"); + options.put( + "url", + jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix)); + options.put("table-name", tablePath.getFullName()); + options.put("username", username); + options.put("password", pwd); + return options; + } + + protected boolean createDatabaseInternal(String databaseName) { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement( + String.format(jdbcDialect.createDatabaseSql(databaseName)))) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed creating database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + + protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { + try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); + PreparedStatement ps = + conn.prepareStatement(jdbcDialect.dropDatabaseSql(databaseName))) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format( + "Failed dropping database %s in catalog %s", + databaseName, this.catalogName), + e); + } + } + + // todo: If the origin source is mysql, we can directly use create table like to create the + // target table? + protected boolean createTableInternal(TablePath tablePath, CatalogTable table) + throws CatalogException { + String dbUrl = + jdbcDialect.getUrlFromDatabaseName(baseUrl, tablePath.getDatabaseName(), suffix); + String createTableSql = jdbcDialect.createTableSql(tablePath, table); + try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); + PreparedStatement ps = conn.prepareStatement(createTableSql)) { + return ps.execute(); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed creating table %s", tablePath.getFullName()), e); + } + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java index d8534d5df..f6c9cc679 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java @@ -18,16 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql; -import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.ConstraintKey; -import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; -import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; -import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; -import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; @@ -35,26 +25,13 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalo import com.mysql.cj.MysqlType; import com.mysql.cj.jdbc.result.ResultSetImpl; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; public class MySqlCatalog extends AbstractJdbcCatalog { - protected static final Set<String> SYS_DATABASES = new HashSet<>(4); - static { SYS_DATABASES.add("information_schema"); SYS_DATABASES.add("mysql"); @@ -67,189 +44,12 @@ public class MySqlCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo); } - @Override - public List<String> listDatabases() throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;")) { - - List<String> databases = new ArrayList<>(); - ResultSet rs = ps.executeQuery(); - - while (rs.next()) { - String databaseName = rs.getString(1); - if (!SYS_DATABASES.contains(databaseName)) { - databases.add(rs.getString(1)); - } - } - - return databases; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", this.catalogName), e); - } - } - - @Override - public List<String> listTables(String databaseName) - throws CatalogException, DatabaseNotExistException { - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(this.catalogName, databaseName); - } - - String dbUrl = getUrlFromDatabaseName(databaseName); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = conn.prepareStatement("SHOW TABLES;")) { - - ResultSet rs = ps.executeQuery(); - - List<String> tables = new ArrayList<>(); - - while (rs.next()) { - tables.add(rs.getString(1)); - } - - return tables; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", catalogName), e); - } - } - - @Override - public CatalogTable getTable(TablePath tablePath) - throws CatalogException, TableNotExistException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(catalogName, tablePath); - } - - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { - DatabaseMetaData metaData = conn.getMetaData(); - Optional<PrimaryKey> primaryKey = - getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName()); - List<ConstraintKey> constraintKeys = - getConstraintKeys( - metaData, tablePath.getDatabaseName(), tablePath.getTableName()); - - try (PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM %s WHERE 1 = 0;", - tablePath.getFullNameWithQuoted()))) { - ResultSetMetaData tableMetaData = ps.getMetaData(); - TableSchema.Builder builder = TableSchema.builder(); - // add column - for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { - String columnName = tableMetaData.getColumnName(i); - SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i); - int columnDisplaySize = tableMetaData.getColumnDisplaySize(i); - String comment = tableMetaData.getColumnLabel(i); - boolean isNullable = - tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable; - Object defaultValue = - getColumnDefaultValue(metaData, tablePath.getTableName(), columnName) - .orElse(null); - - PhysicalColumn physicalColumn = - PhysicalColumn.of( - columnName, - type, - columnDisplaySize, - isNullable, - defaultValue, - comment); - builder.column(physicalColumn); - } - // add primary key - primaryKey.ifPresent(builder::primaryKey); - // add constraint key - constraintKeys.forEach(builder::constraintKey); - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); - return CatalogTable.of( - tableIdentifier, - builder.build(), - buildConnectorOptions(tablePath), - Collections.emptyList(), - ""); - } - - } catch (Exception e) { - throw new CatalogException( - String.format("Failed getting table %s", tablePath.getFullName()), e); - } - } - - // todo: If the origin source is mysql, we can directly use create table like to create the - // target table? - @Override - protected boolean createTableInternal(TablePath tablePath, CatalogTable table) - throws CatalogException { - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - String createTableSql = MysqlCreateTableSqlBuilder.builder(tablePath, table).build(); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = conn.prepareStatement(createTableSql)) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format("Failed creating table %s", tablePath.getFullName()), e); - } - } - - @Override - protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format( - "DROP TABLE %s IF EXIST;", tablePath.getFullName()))) { - // Will there exist concurrent drop for one table? - return ps.execute(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed dropping table %s", tablePath.getFullName()), e); - } - } - - @Override - protected boolean createDatabaseInternal(String databaseName) throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format("CREATE DATABASE `%s`;", databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed creating database %s in catalog %s", - databaseName, this.catalogName), - e); - } - } - - @Override - protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement(String.format("DROP DATABASE `%s`;", databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed dropping database %s in catalog %s", - databaseName, this.catalogName), - e); - } - } - /** * @see com.mysql.cj.MysqlType * @see ResultSetImpl#getObjectStoredProc(int, int) */ - @SuppressWarnings("unchecked") - private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) + @Override + public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) throws SQLException { MysqlType mysqlType = MysqlType.getByName(metadata.getColumnTypeName(colIndex)); Map<String, Object> dataTypeProperties = new HashMap<>(); @@ -257,19 +57,4 @@ public class MySqlCatalog extends AbstractJdbcCatalog { dataTypeProperties.put(MysqlDataTypeConvertor.SCALE, metadata.getScale(colIndex)); return new MysqlDataTypeConvertor().toSeaTunnelType(mysqlType, dataTypeProperties); } - - @SuppressWarnings("MagicNumber") - private Map<String, String> buildConnectorOptions(TablePath tablePath) { - Map<String, String> options = new HashMap<>(8); - options.put("connector", "jdbc"); - options.put("url", baseUrl + tablePath.getDatabaseName()); - options.put("table-name", tablePath.getFullName()); - options.put("username", username); - options.put("password", pwd); - return options; - } - - private String getUrlFromDatabaseName(String databaseName) { - return baseUrl + databaseName + suffix; - } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java index 9a015ca73..c2e300516 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java @@ -49,7 +49,7 @@ public class MysqlCreateTableSqlBuilder { private List<ConstraintKey> constraintKeys; - private MysqlDataTypeConvertor mysqlDataTypeConvertor; + private final MysqlDataTypeConvertor mysqlDataTypeConvertor; private MysqlCreateTableSqlBuilder(String tableName) { checkNotNull(tableName, "tableName must not be null"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java index 25c02e6b1..f15a0e746 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalog.java @@ -19,41 +19,21 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sqlserver; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.ConstraintKey; -import org.apache.seatunnel.api.table.catalog.PhysicalColumn; -import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; -import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; -import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.utils.JdbcUrlUtil; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; import org.apache.commons.lang3.tuple.Pair; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; public class SqlServerCatalog extends AbstractJdbcCatalog { - private static final Set<String> SYS_DATABASES = new HashSet<>(4); - static { SYS_DATABASES.add("master"); SYS_DATABASES.add("tempdb"); @@ -66,150 +46,6 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { super(catalogName, username, pwd, urlInfo); } - @Override - public List<String> listDatabases() throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = conn.prepareStatement("SELECT NAME FROM SYS.DATABASES")) { - - List<String> databases = new ArrayList<>(); - ResultSet rs = ps.executeQuery(); - - while (rs.next()) { - String databaseName = rs.getString(1); - if (!SYS_DATABASES.contains(databaseName)) { - databases.add(databaseName); - } - } - - return databases; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", this.catalogName), e); - } - } - - @Override - public List<String> listTables(String databaseName) - throws CatalogException, DatabaseNotExistException { - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(this.catalogName, databaseName); - } - - String dbUrl = getUrlFromDatabaseName(databaseName); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - "SELECT TABLE_SCHEMA, TABLE_NAME FROM " - + databaseName - + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'")) { - - ResultSet rs = ps.executeQuery(); - - List<String> tables = new ArrayList<>(); - - while (rs.next()) { - tables.add(rs.getString(1) + "." + rs.getString(2)); - } - - return tables; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", catalogName), e); - } - } - - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()) - .contains(tablePath.getSchemaAndTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } - - @Override - public CatalogTable getTable(TablePath tablePath) - throws CatalogException, TableNotExistException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(catalogName, tablePath); - } - - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { - DatabaseMetaData metaData = conn.getMetaData(); - Optional<PrimaryKey> primaryKey = - getPrimaryKey( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - List<ConstraintKey> constraintKeys = - getConstraintKeys( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - - try (PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM %s WHERE 1 = 0;", - tablePath.getFullNameWithQuoted("\"")))) { - ResultSetMetaData tableMetaData = ps.getMetaData(); - TableSchema.Builder builder = TableSchema.builder(); - // add column - for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { - String columnName = tableMetaData.getColumnName(i); - SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i); - int columnDisplaySize = tableMetaData.getColumnDisplaySize(i); - String comment = tableMetaData.getColumnLabel(i); - boolean isNullable = - tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable; - Object defaultValue = - getColumnDefaultValue( - metaData, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName(), - columnName) - .orElse(null); - - PhysicalColumn physicalColumn = - PhysicalColumn.of( - columnName, - type, - columnDisplaySize, - isNullable, - defaultValue, - comment); - builder.column(physicalColumn); - } - // add primary key - primaryKey.ifPresent(builder::primaryKey); - // add constraint key - constraintKeys.forEach(builder::constraintKey); - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogName, - tablePath.getDatabaseName(), - tablePath.getSchemaName(), - tablePath.getTableName()); - return CatalogTable.of( - tableIdentifier, - builder.build(), - buildConnectorOptions(tablePath), - Collections.emptyList(), - ""); - } - - } catch (Exception e) { - throw new CatalogException( - String.format("Failed getting table %s", tablePath.getFullName()), e); - } - } - @Override protected boolean createTableInternal(TablePath tablePath, CatalogTable table) throws CatalogException { @@ -217,54 +53,7 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { } @Override - protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { - String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format("DROP TABLE IF EXIST %s", tablePath.getFullName()))) { - // Will there exist concurrent drop for one table? - return ps.execute(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed dropping table %s", tablePath.getFullName()), e); - } - } - - @Override - protected boolean createDatabaseInternal(String databaseName) throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format("CREATE DATABASE `%s`", databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed creating database %s in catalog %s", - databaseName, this.catalogName), - e); - } - } - - @Override - protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd); - PreparedStatement ps = - conn.prepareStatement( - String.format("DROP DATABASE IF EXISTS `%s`;", databaseName))) { - return ps.execute(); - } catch (Exception e) { - throw new CatalogException( - String.format( - "Failed dropping database %s in catalog %s", - databaseName, this.catalogName), - e); - } - } - - @SuppressWarnings("unchecked") - private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) + public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) throws SQLException { Pair<SqlServerType, Map<String, Object>> pair = SqlServerType.parse(metadata.getColumnTypeName(colIndex)); @@ -274,19 +63,4 @@ public class SqlServerCatalog extends AbstractJdbcCatalog { dataTypeProperties.put(SqlServerDataTypeConvertor.SCALE, metadata.getScale(colIndex)); return new SqlServerDataTypeConvertor().toSeaTunnelType(pair.getLeft(), dataTypeProperties); } - - @SuppressWarnings("MagicNumber") - private Map<String, String> buildConnectorOptions(TablePath tablePath) { - Map<String, String> options = new HashMap<>(8); - options.put("connector", "jdbc"); - options.put("url", getUrlFromDatabaseName(tablePath.getDatabaseName())); - options.put("table-name", tablePath.getFullName()); - options.put("username", username); - options.put("password", pwd); - return options; - } - - private String getUrlFromDatabaseName(String databaseName) { - return baseUrl + ";databaseName=" + databaseName + ";" + suffix; - } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java index e848498c9..0b0fa91f2 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerType.java @@ -140,7 +140,7 @@ public enum SqlServerType implements SQLType { public static Pair<SqlServerType, Map<String, Object>> parse(String fullTypeName) { Map<String, Object> params = new HashMap<>(); String typeName = fullTypeName; - if (fullTypeName.indexOf("(") != -1) { + if (fullTypeName.contains("(")) { typeName = fullTypeName.substring(0, fullTypeName.indexOf("(")).trim(); String paramsStr = fullTypeName.substring( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index 64ed388df..36e47eda1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; @@ -193,4 +195,40 @@ public interface JdbcDialect extends Serializable { PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery()); return ps.getMetaData(); } + + default String listDatabases() { + return "SHOW DATABASES;"; + } + + default String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) { + return baseUrl + databaseName + suffix; + } + + default String createDatabaseSql(String databaseName) { + return String.format("CREATE DATABASE IF NOT EXISTS %s;", quoteIdentifier(databaseName)); + } + + default String dropDatabaseSql(String databaseName) { + return String.format("DROP DATABASE IF EXISTS %s;", quoteIdentifier(databaseName)); + } + + default String getTableName(ResultSet rs) throws SQLException { + return rs.getString(1); + } + + default String getTableName(TablePath tablePath) { + return tablePath.getTableName(); + } + + default String listTableSql(String databaseName) { + return "SHOW TABLES;"; + } + + default String getDropTableSql(String tableName) { + return String.format("DROP TABLE %s IF EXIST;", tableName); + } + + default String createTableSql(TablePath tablePath, CatalogTable catalogTable) { + return ""; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java index 128b8ae4b..63584d8ab 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MysqlDialect.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; @@ -78,4 +81,9 @@ public class MysqlDialect implements JdbcDialect { statement.setFetchSize(Integer.MIN_VALUE); return statement; } + + @Override + public String createTableSql(TablePath tablePath, CatalogTable catalogTable) { + return MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build(); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java index 697d2d2dc..90b376624 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java @@ -17,10 +17,13 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -100,4 +103,31 @@ public class SqlServerDialect implements JdbcDialect { return Optional.of(upsertSQL); } + + @Override + public String listDatabases() { + return "SELECT NAME FROM SYS.DATABASES"; + } + + @Override + public String listTableSql(String databaseName) { + return "SELECT TABLE_SCHEMA, TABLE_NAME FROM " + + databaseName + + ".INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'"; + } + + @Override + public String getTableName(ResultSet rs) throws SQLException { + return rs.getString(1) + "." + rs.getString(2); + } + + @Override + public String getTableName(TablePath tablePath) { + return tablePath.getSchemaName() + "." + tablePath.getTableName(); + } + + @Override + public String getUrlFromDatabaseName(String baseUrl, String databaseName, String suffix) { + return baseUrl + ";databaseName=" + databaseName + ";" + suffix; + } } diff --git a/seatunnel-connectors-v2/connector-starrocks/pom.xml b/seatunnel-connectors-v2/connector-starrocks/pom.xml index 08e49bc0f..44b899885 100644 --- a/seatunnel-connectors-v2/connector-starrocks/pom.xml +++ b/seatunnel-connectors-v2/connector-starrocks/pom.xml @@ -49,6 +49,11 @@ <artifactId>connector-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-jdbc</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java index 7bf308b1c..79d9bf901 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java @@ -17,13 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog; -import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.PrimaryKey; -import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; -import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.catalog.exception.CatalogException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; @@ -36,6 +32,7 @@ import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException; import org.apache.commons.lang3.StringUtils; @@ -47,22 +44,18 @@ import com.mysql.cj.MysqlType; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import static com.google.common.base.Preconditions.checkArgument; -public class StarRocksCatalog implements Catalog { +public class StarRocksCatalog extends AbstractJdbcCatalog { protected final String catalogName; protected String defaultDatabase = "information_schema"; @@ -71,8 +64,6 @@ public class StarRocksCatalog implements Catalog { protected final String baseUrl; protected String defaultUrl; private final JdbcUrlUtil.UrlInfo urlInfo; - - private static final Set<String> SYS_DATABASES = new HashSet<>(); private static final Logger LOG = LoggerFactory.getLogger(StarRocksCatalog.class); static { @@ -82,6 +73,8 @@ public class StarRocksCatalog implements Catalog { public StarRocksCatalog(String catalogName, String username, String pwd, String defaultUrl) { + super(catalogName, username, pwd, JdbcUrlUtil.getUrlInfo(defaultUrl)); + checkArgument(StringUtils.isNotBlank(username)); checkArgument(StringUtils.isNotBlank(pwd)); checkArgument(StringUtils.isNotBlank(defaultUrl)); @@ -97,104 +90,9 @@ public class StarRocksCatalog implements Catalog { } @Override - public List<String> listDatabases() throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { - - PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;"); - - List<String> databases = new ArrayList<>(); - ResultSet rs = ps.executeQuery(); - - while (rs.next()) { - String databaseName = rs.getString(1); - if (!SYS_DATABASES.contains(databaseName)) { - databases.add(rs.getString(1)); - } - } - - return databases; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", this.catalogName), e); - } - } - - @Override - public List<String> listTables(String databaseName) - throws CatalogException, DatabaseNotExistException { - if (!databaseExists(databaseName)) { - throw new DatabaseNotExistException(this.catalogName, databaseName); - } - - try (Connection conn = - DriverManager.getConnection( - urlInfo.getUrlWithDatabase(databaseName), username, pwd)) { - PreparedStatement ps = conn.prepareStatement("SHOW TABLES;"); - - ResultSet rs = ps.executeQuery(); - - List<String> tables = new ArrayList<>(); - - while (rs.next()) { - tables.add(rs.getString(1)); - } - - return tables; - } catch (Exception e) { - throw new CatalogException( - String.format("Failed listing database in catalog %s", catalogName), e); - } - } - - @Override - public CatalogTable getTable(TablePath tablePath) - throws CatalogException, TableNotExistException { - if (!tableExists(tablePath)) { - throw new TableNotExistException(catalogName, tablePath); - } - - String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName()); - try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) { - Optional<PrimaryKey> primaryKey = - getPrimaryKey(tablePath.getDatabaseName(), tablePath.getTableName()); - - PreparedStatement ps = - conn.prepareStatement( - String.format( - "SELECT * FROM %s WHERE 1 = 0;", - tablePath.getFullNameWithQuoted())); - - ResultSetMetaData tableMetaData = ps.getMetaData(); - - TableSchema.Builder builder = TableSchema.builder(); - for (int i = 1; i <= tableMetaData.getColumnCount(); i++) { - SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i); - // TODO add default value and test it - builder.column( - PhysicalColumn.of( - tableMetaData.getColumnName(i), - type, - tableMetaData.getColumnDisplaySize(i), - tableMetaData.isNullable(i) == ResultSetMetaData.columnNullable, - null, - tableMetaData.getColumnLabel(i))); - } - - primaryKey.ifPresent(builder::primaryKey); - - TableIdentifier tableIdentifier = - TableIdentifier.of( - catalogName, tablePath.getDatabaseName(), tablePath.getTableName()); - return CatalogTable.of( - tableIdentifier, - builder.build(), - buildConnectorOptions(tablePath), - Collections.emptyList(), - ""); - } catch (Exception e) { - throw new CatalogException( - String.format("Failed getting table %s", tablePath.getFullName()), e); - } + protected boolean createTableInternal(TablePath tablePath, CatalogTable table) + throws CatalogException { + throw new UnsupportedOperationException("Unsupported create table"); } @Override @@ -209,6 +107,11 @@ public class StarRocksCatalog implements Catalog { throw new UnsupportedOperationException(); } + @Override + protected boolean dropTableInternal(TablePath tablePath) throws CatalogException { + throw new UnsupportedOperationException(); + } + @Override public void createDatabase(TablePath tablePath, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException { @@ -229,6 +132,11 @@ public class StarRocksCatalog implements Catalog { } } + @Override + protected boolean createDatabaseInternal(String databaseName) { + throw new UnsupportedOperationException(); + } + @Override public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException { @@ -246,8 +154,14 @@ public class StarRocksCatalog implements Catalog { } } + @Override + protected boolean dropDatabaseInternal(String databaseName) throws CatalogException { + throw new UnsupportedOperationException(); + } + /** @see com.mysql.cj.MysqlType */ - private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) + @Override + public SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) throws SQLException { MysqlType starrocksType = MysqlType.getByName(metadata.getColumnTypeName(colIndex)); switch (starrocksType) { @@ -315,7 +229,8 @@ public class StarRocksCatalog implements Catalog { } @SuppressWarnings("MagicNumber") - private Map<String, String> buildConnectorOptions(TablePath tablePath) { + @Override + public Map<String, String> buildConnectorOptions(TablePath tablePath) { Map<String, String> options = new HashMap<>(8); options.put("connector", "starrocks"); options.put("url", baseUrl + tablePath.getDatabaseName()); @@ -369,29 +284,6 @@ public class StarRocksCatalog implements Catalog { return res; } - @Override - public String getDefaultDatabase() { - return defaultDatabase; - } - - @Override - public void open() throws CatalogException { - try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) { - // test connection, fail early if we cannot connect to database - conn.getCatalog(); - } catch (SQLException e) { - throw new CatalogException( - String.format("Failed connecting to %s via JDBC.", defaultUrl), e); - } - - LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl); - } - - @Override - public void close() throws CatalogException { - LOG.info("Catalog {} closing", catalogName); - } - protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) throws SQLException { List<String> pkFields = new ArrayList<>(); @@ -414,21 +306,4 @@ public class StarRocksCatalog implements Catalog { } return Optional.empty(); } - - @Override - public boolean databaseExists(String databaseName) throws CatalogException { - checkArgument(StringUtils.isNotBlank(databaseName)); - - return listDatabases().contains(databaseName); - } - - @Override - public boolean tableExists(TablePath tablePath) throws CatalogException { - try { - return databaseExists(tablePath.getDatabaseName()) - && listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName()); - } catch (DatabaseNotExistException e) { - return false; - } - } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java new file mode 100644 index 000000000..9286e628f --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksDialect.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.catalog; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect; + +public class StarRocksDialect extends MysqlDialect { + @Override + public String dialectName() { + return "StarRocks"; + } +}
