This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8ee129d20f [Improve][Connector-V2] Reuse connection in
StarRocksCatalog (#7342)
8ee129d20f is described below
commit 8ee129d20f3f200ba04d02623469e3f3008b8c96
Author: Jia Fan <[email protected]>
AuthorDate: Thu Aug 22 10:03:19 2024 +0800
[Improve][Connector-V2] Reuse connection in StarRocksCatalog (#7342)
---
.../starrocks/catalog/StarRocksCatalog.java | 89 ++++++++++++----------
1 file changed, 50 insertions(+), 39 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 0aee20aa94..9b1875374b 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -78,6 +78,8 @@ public class StarRocksCatalog implements Catalog {
protected String defaultUrl;
private final JdbcUrlUtil.UrlInfo urlInfo;
private final String template;
+ private Connection conn;
+
private static final Logger LOG =
LoggerFactory.getLogger(StarRocksCatalog.class);
public StarRocksCatalog(
@@ -99,8 +101,7 @@ public class StarRocksCatalog implements Catalog {
@Override
public List<String> listDatabases() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
- PreparedStatement ps = conn.prepareStatement("SHOW
DATABASES;");
+ try (PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
ResultSet rs = ps.executeQuery()) {
List<String> databases = new ArrayList<>();
@@ -122,20 +123,19 @@ public class StarRocksCatalog implements Catalog {
throw new DatabaseNotExistException(this.catalogName,
databaseName);
}
- try (Connection conn =
- DriverManager.getConnection(
- urlInfo.getUrlWithDatabase(databaseName),
username, pwd);
- PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
- ResultSet rs = ps.executeQuery()) {
-
- List<String> tables = new ArrayList<>();
-
- while (rs.next()) {
- tables.add(rs.getString(1));
+ try (PreparedStatement ps =
+ conn.prepareStatement(
+ "SELECT TABLE_NAME FROM information_schema.tables "
+ + "WHERE TABLE_SCHEMA = ? ORDER BY
TABLE_NAME")) {
+ ps.setString(1, databaseName);
+ try (ResultSet rs = ps.executeQuery()) {
+ List<String> tables = new ArrayList<>();
+ while (rs.next()) {
+ tables.add(rs.getString(1));
+ }
+ return tables;
}
-
- return tables;
- } catch (Exception e) {
+ } catch (SQLException e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s",
catalogName), e);
}
@@ -148,8 +148,7 @@ public class StarRocksCatalog implements Catalog {
throw new TableNotExistException(catalogName, tablePath);
}
- String dbUrl = urlInfo.getUrlWithDatabase(tablePath.getDatabaseName());
- try (Connection conn = DriverManager.getConnection(dbUrl, username,
pwd)) {
+ try {
Optional<PrimaryKey> primaryKey =
getPrimaryKey(tablePath.getDatabaseName(),
tablePath.getTableName());
@@ -213,7 +212,7 @@ public class StarRocksCatalog implements Catalog {
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ try {
conn.createStatement()
.execute(StarRocksSaveModeUtil.getDropTableSql(tablePath,
ignoreIfNotExists));
} catch (Exception e) {
@@ -224,7 +223,7 @@ public class StarRocksCatalog implements Catalog {
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ try {
if (ignoreIfNotExists) {
conn.createStatement()
.execute(StarRocksSaveModeUtil.getTruncateTableSql(tablePath));
@@ -237,8 +236,8 @@ public class StarRocksCatalog implements Catalog {
}
public void executeSql(TablePath tablePath, String sql) {
- try (Connection connection = DriverManager.getConnection(defaultUrl,
username, pwd)) {
- connection.createStatement().execute(sql);
+ try {
+ conn.createStatement().execute(sql);
} catch (Exception e) {
throw new CatalogException(String.format("Failed EXECUTE SQL in
catalog %s", sql), e);
}
@@ -246,8 +245,7 @@ public class StarRocksCatalog implements Catalog {
public boolean isExistsData(TablePath tablePath) {
String sql = String.format("select * from %s limit 1",
tablePath.getFullName());
- try (Connection connection = DriverManager.getConnection(defaultUrl,
username, pwd);
- Statement statement = connection.createStatement();
+ try (Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
if (resultSet == null) {
return false;
@@ -262,7 +260,7 @@ public class StarRocksCatalog implements Catalog {
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ try {
conn.createStatement()
.execute(
StarRocksSaveModeUtil.getCreateDatabaseSql(
@@ -276,7 +274,7 @@ public class StarRocksCatalog implements Catalog {
@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ try {
conn.createStatement()
.execute(
StarRocksSaveModeUtil.getDropDatabaseSql(
@@ -368,7 +366,7 @@ public class StarRocksCatalog implements Catalog {
public void createTable(String sql)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ try {
log.info("create table sql is :{}", sql);
conn.createStatement().execute(sql);
} catch (Exception e) {
@@ -418,7 +416,8 @@ public class StarRocksCatalog implements Catalog {
@Override
public void open() throws CatalogException {
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd)) {
+ try {
+ conn = DriverManager.getConnection(defaultUrl, username, pwd);
// test connection, fail early if we cannot connect to database
conn.getCatalog();
} catch (SQLException e) {
@@ -432,6 +431,11 @@ public class StarRocksCatalog implements Catalog {
@Override
public void close() throws CatalogException {
LOG.info("Catalog {} closing", catalogName);
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ throw new CatalogException("close doris catalog failed", e);
+ }
}
@Override
@@ -442,13 +446,12 @@ public class StarRocksCatalog implements Catalog {
protected Optional<PrimaryKey> getPrimaryKey(String schema, String table)
throws SQLException {
List<String> pkFields = new ArrayList<>();
- try (Connection conn = DriverManager.getConnection(defaultUrl,
username, pwd);
- ResultSet rs =
- conn.createStatement()
- .executeQuery(
- String.format(
- "SELECT COLUMN_NAME FROM
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
- schema, table))) {
+ try (ResultSet rs =
+ conn.createStatement()
+ .executeQuery(
+ String.format(
+ "SELECT COLUMN_NAME FROM
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
+ schema, table))) {
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
pkFields.add(columnName);
@@ -471,11 +474,19 @@ public class StarRocksCatalog implements Catalog {
@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
- try {
- return databaseExists(tablePath.getDatabaseName())
- &&
listTables(tablePath.getDatabaseName()).contains(tablePath.getTableName());
- } catch (DatabaseNotExistException e) {
- return false;
+ try (PreparedStatement ps =
+ conn.prepareStatement(
+ "SELECT TABLE_NAME FROM information_schema.tables "
+ + "WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? "
+ + "ORDER BY TABLE_NAME")) {
+ ps.setString(1, tablePath.getDatabaseName());
+ ps.setString(2, tablePath.getTableName());
+ try (ResultSet rs = ps.executeQuery()) {
+ return rs.next();
+ }
+ } catch (SQLException e) {
+ throw new CatalogException(
+ String.format("check table [%s] exists failed",
tablePath.getFullName()), e);
}
}