This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.4.0
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/dev-1.4.0 by this push:
new b5053afb9 Generate spark sql based on the jdbc datasource and
optimized code (#4570)
b5053afb9 is described below
commit b5053afb9af75804e8e0bade820567a74a2b5bdb
Author: ChengJie1053 <[email protected]>
AuthorDate: Tue May 30 16:46:26 2023 +0800
Generate spark sql based on the jdbc datasource and optimized code (#4570)
---
.../query/common/domain/GenerateSqlInfo.java | 57 ++++++++
.../common/service/AbstractDbMetaService.java | 15 +++
.../query/common/service/MetadataDbService.java | 8 ++
.../query/common/service/SparkDdlSQlTemplate.java | 35 +++++
.../query/server/restful/MetadataQueryRestful.java | 58 +++++++++
.../query/server/service/MetadataQueryService.java | 20 +++
.../service/impl/MetadataQueryServiceImpl.java | 81 ++++++++++++
...lConnection.java => AbstractSqlConnection.java} | 98 +++-----------
.../query/service/ClickhouseMetaService.java | 5 +
.../metadata/query/service/Db2MetaService.java | 5 +
.../metadata/query/service/DmMetaService.java | 5 +
.../query/service/GreenplumMetaService.java | 5 +
.../query/service/KingbaseMetaService.java | 5 +
.../metadata/query/service/MysqlMetaService.java | 5 +
.../metadata/query/service/OracleMetaService.java | 5 +
.../query/service/PostgresqlMetaService.java | 5 +
.../query/service/SqlserverMetaService.java | 5 +
.../query/service/clickhouse/SqlConnection.java | 111 +++-------------
.../metadata/query/service/db2/SqlConnection.java | 145 +++------------------
.../metadata/query/service/dm/SqlConnection.java | 84 ++----------
.../query/service/greenplum/SqlConnection.java | 133 ++-----------------
.../query/service/kingbase/SqlConnection.java | 99 +++-----------
.../query/service/mysql/SqlConnection.java | 112 ++--------------
.../query/service/oracle/SqlConnection.java | 104 +++------------
.../query/service/postgres/SqlConnection.java | 123 +----------------
.../query/service/sqlserver/SqlConnection.java | 107 ++-------------
26 files changed, 462 insertions(+), 973 deletions(-)
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/GenerateSqlInfo.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/GenerateSqlInfo.java
new file mode 100755
index 000000000..d5a7e0250
--- /dev/null
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/domain/GenerateSqlInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.metadata.query.common.domain;
+
+import java.io.Serializable;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+/** The meta information of field */
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_EMPTY)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GenerateSqlInfo implements Serializable {
+
+ private String ddl;
+ private String dml;
+ private String dql;
+
+ public String getDdl() {
+ return ddl;
+ }
+
+ public void setDdl(String ddl) {
+ this.ddl = ddl;
+ }
+
+ public String getDml() {
+ return dml;
+ }
+
+ public void setDml(String dml) {
+ this.dml = dml;
+ }
+
+ public String getDql() {
+ return dql;
+ }
+
+ public void setDql(String dql) {
+ this.dql = dql;
+ }
+}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/AbstractDbMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/AbstractDbMetaService.java
index cb3ed521c..f7677c885 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/AbstractDbMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/AbstractDbMetaService.java
@@ -38,6 +38,11 @@ public abstract class AbstractDbMetaService<C extends
Closeable> extends Abstrac
return this.getConnAndRun(operator, params, this::queryDatabases);
}
+ @Override
+ public String getSqlConnectUrl(String operator, Map<String, Object> params) {
+ return this.getConnAndRun(operator, params, this::querySqlConnectUrl);
+ }
+
@Override
public List<String> getTables(String operator, Map<String, Object> params,
String database) {
return this.getConnAndRun(operator, params, conn -> this.queryTables(conn,
database));
@@ -88,6 +93,16 @@ public abstract class AbstractDbMetaService<C extends
Closeable> extends Abstrac
throw new WarnException(-1, "This method is no supported");
}
+ /**
+ * Get sql connect url
+ *
+ * @param connection metadata connection
+ * @return
+ */
+ public String querySqlConnectUrl(C connection) {
+ throw new WarnException(-1, "This method is no supported");
+ }
+
/**
* Get table list by connection and database
*
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/MetadataDbService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/MetadataDbService.java
index 4ef371c24..e8c5d3db6 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/MetadataDbService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/MetadataDbService.java
@@ -84,4 +84,12 @@ public interface MetadataDbService extends
BaseMetadataService {
*/
List<MetaColumnInfo> getColumns(
String operator, Map<String, Object> params, String database, String
table);
+
+ /**
+ * Get sql connect url
+ *
+ * @param params connect params
+ * @return
+ */
+ public String getSqlConnectUrl(String operator, Map<String, Object> params);
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
new file mode 100644
index 000000000..62b83fe05
--- /dev/null
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/common/src/main/java/org/apache/linkis/metadata/query/common/service/SparkDdlSQlTemplate.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.metadata.query.common.service;
+
+public class SparkDdlSQlTemplate {
+
+ public static final String JDBC_DDL_SQL_TEMPLATE =
+ "CREATE TEMPORARY TABLE %s "
+ + "USING org.apache.spark.sql.jdbc "
+ + "OPTIONS ("
+ + " url '%s',"
+ + " dbtable '%s',"
+ + " user '%s',"
+ + " password '%s'"
+ + ")";
+
+ public static final String DML_SQL_TEMPLATE = "INSERT INTO %s SELECT * FROM
${resultTable}";
+
+ public static final String DQL_SQL_TEMPLATE = "SELECT %s FROM %s";
+}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java
index b8d2d58ab..eb2d61956 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/restful/MetadataQueryRestful.java
@@ -19,6 +19,7 @@ package org.apache.linkis.metadata.query.server.restful;
import org.apache.linkis.common.exception.ErrorException;
import org.apache.linkis.datasourcemanager.common.util.json.Json;
+import org.apache.linkis.metadata.query.common.domain.GenerateSqlInfo;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
import org.apache.linkis.metadata.query.common.domain.MetaPartitionInfo;
import
org.apache.linkis.metadata.query.common.exception.MetaMethodInvokeException;
@@ -409,6 +410,63 @@ public class MetadataQueryRestful {
}
}
+ @ApiOperation(value = "getSparkDdlSql", notes = "get spark ddl sql",
response = Message.class)
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "dataSourceName", required = true, dataType =
"String"),
+ @ApiImplicitParam(name = "envId", required = false, dataType = "String"),
+ @ApiImplicitParam(name = "system", required = true, dataType = "String"),
+ @ApiImplicitParam(name = "database", required = true, dataType = "String"),
+ @ApiImplicitParam(name = "table", required = true, dataType = "String")
+ })
+ @RequestMapping(value = "/getSparkSql", method = RequestMethod.GET)
+ public Message getSparkSql(
+ @RequestParam("dataSourceName") String dataSourceName,
+ @RequestParam(value = "envId", required = false) String envId,
+ @RequestParam("database") String database,
+ @RequestParam("table") String table,
+ @RequestParam("system") String system,
+ HttpServletRequest request) {
+ try {
+ if (StringUtils.isBlank(system)) {
+ return Message.error("'system' is missing[缺少系统名]");
+ }
+ if (!MetadataUtils.nameRegexPattern.matcher(system).matches()) {
+ return Message.error("'system' is invalid[系统名错误]");
+ }
+ if (!MetadataUtils.nameRegexPattern.matcher(database).matches()) {
+ return Message.error("'database' is invalid[数据库名错误]");
+ }
+ if (!MetadataUtils.nameRegexPattern.matcher(table).matches()) {
+ return Message.error("'table' is invalid[表名错误]");
+ }
+ if (!MetadataUtils.nameRegexPattern.matcher(dataSourceName).matches()) {
+ return Message.error("'dataSourceName' is invalid[数据源错误]");
+ }
+
+ String userName =
+ ModuleUserUtils.getOperationUser(
+ request, "getSparkDdlSql, dataSourceName:" + dataSourceName);
+
+ GenerateSqlInfo sparkSql =
+ metadataQueryService.getSparkSqlByDsNameAndEnvId(
+ dataSourceName, database, table, system, userName, envId);
+ return Message.ok().data("sparkSql", sparkSql);
+ } catch (Exception e) {
+ return errorToResponseMessage(
+ "Fail to spark sql[获取getSparkSql信息失败], name:["
+ + dataSourceName
+ + "]"
+ + ", system:["
+ + system
+ + "], database:["
+ + database
+ + "], table:["
+ + table
+ + "]",
+ e);
+ }
+ }
+
private Message errorToResponseMessage(String uiMessage, Exception e) {
if (e instanceof MetaMethodInvokeException) {
MetaMethodInvokeException invokeException = (MetaMethodInvokeException)
e;
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java
index f156058ec..7e5449274 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/MetadataQueryService.java
@@ -18,6 +18,7 @@
package org.apache.linkis.metadata.query.server.service;
import org.apache.linkis.common.exception.ErrorException;
+import org.apache.linkis.metadata.query.common.domain.GenerateSqlInfo;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
import org.apache.linkis.metadata.query.common.domain.MetaPartitionInfo;
@@ -242,4 +243,23 @@ public interface MetadataQueryService {
String userName,
String envId)
throws ErrorException;
+
+ /**
+ * @param dataSourceName
+ * @param database
+ * @param table
+ * @param system
+ * @param userName
+ * @param envId
+ * @return
+ * @throws ErrorException
+ */
+ GenerateSqlInfo getSparkSqlByDsNameAndEnvId(
+ String dataSourceName,
+ String database,
+ String table,
+ String system,
+ String userName,
+ String envId)
+ throws ErrorException;
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
index ab38c4228..55b8061d1 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/server/src/main/java/org/apache/linkis/metadata/query/server/service/impl/MetadataQueryServiceImpl.java
@@ -24,15 +24,19 @@ import
org.apache.linkis.datasourcemanager.common.domain.DataSource;
import org.apache.linkis.datasourcemanager.common.protocol.DsInfoQueryRequest;
import org.apache.linkis.datasourcemanager.common.protocol.DsInfoResponse;
import org.apache.linkis.metadata.query.common.MdmConfiguration;
+import org.apache.linkis.metadata.query.common.cache.CacheConfiguration;
+import org.apache.linkis.metadata.query.common.domain.GenerateSqlInfo;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
import org.apache.linkis.metadata.query.common.domain.MetaPartitionInfo;
import
org.apache.linkis.metadata.query.common.exception.MetaMethodInvokeException;
import org.apache.linkis.metadata.query.common.exception.MetaRuntimeException;
import org.apache.linkis.metadata.query.common.service.MetadataConnection;
+import org.apache.linkis.metadata.query.common.service.SparkDdlSQlTemplate;
import org.apache.linkis.metadata.query.server.loader.MetaClassLoaderManager;
import org.apache.linkis.metadata.query.server.service.MetadataQueryService;
import org.apache.linkis.rpc.Sender;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
@@ -48,6 +52,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -362,6 +367,82 @@ public class MetadataQueryServiceImpl implements
MetadataQueryService {
return new ArrayList<>();
}
+ @Override
+ public GenerateSqlInfo getSparkSqlByDsNameAndEnvId(
+ String dataSourceName,
+ String database,
+ String table,
+ String system,
+ String userName,
+ String envId)
+ throws ErrorException {
+ DsInfoResponse dsInfoResponse =
+ queryDataSourceInfoByNameAndEnvId(dataSourceName, system, userName,
envId);
+
+ if (StringUtils.isNotBlank(dsInfoResponse.getDsType())
+ && CacheConfiguration.MYSQL_RELATIONSHIP_LIST
+ .getValue()
+ .contains(dsInfoResponse.getDsType())) {
+ List<MetaColumnInfo> columns =
+ invokeMetaMethod(
+ dsInfoResponse.getDsType(),
+ "getColumns",
+ new Object[] {
+ dsInfoResponse.getCreator(), dsInfoResponse.getParams(),
database, table
+ },
+ List.class);
+
+ String sqlConnectUrl =
+ invokeMetaMethod(
+ dsInfoResponse.getDsType(),
+ "getSqlConnectUrl",
+ new Object[] {dsInfoResponse.getCreator(),
dsInfoResponse.getParams()},
+ String.class);
+
+ return getSparkSqlByJdbc(database, table, dsInfoResponse.getParams(),
columns, sqlConnectUrl);
+ }
+
+ return new GenerateSqlInfo();
+ }
+
+ public GenerateSqlInfo getSparkSqlByJdbc(
+ String database,
+ String table,
+ Map<String, Object> params,
+ List<MetaColumnInfo> columns,
+ String sqlConnectUrl) {
+ GenerateSqlInfo generateSqlInfo = new GenerateSqlInfo();
+ String sparkTableName = table.contains(".") ?
table.substring(table.indexOf(".") + 1) : table;
+
+ String url =
+ String.format(
+ sqlConnectUrl,
+ params.getOrDefault("host", ""),
+ params.getOrDefault("port", ""),
+ database);
+ String ddl =
+ String.format(
+ SparkDdlSQlTemplate.JDBC_DDL_SQL_TEMPLATE,
+ sparkTableName,
+ url,
+ table,
+ params.getOrDefault("username", ""),
+ params.getOrDefault("password", ""));
+ generateSqlInfo.setDdl(ddl);
+
+ String dml = String.format(SparkDdlSQlTemplate.DML_SQL_TEMPLATE,
sparkTableName);
+ generateSqlInfo.setDml(dml);
+
+ String columnStr = "*";
+ if (CollectionUtils.isNotEmpty(columns)) {
+ columnStr = columns.stream().map(column ->
column.getName()).collect(Collectors.joining(","));
+ }
+
+ String dql = String.format(SparkDdlSQlTemplate.DQL_SQL_TEMPLATE,
columnStr, sparkTableName);
+ generateSqlInfo.setDql(dql);
+ return generateSqlInfo;
+ }
+
/**
* Request to get data source information (type and connection parameters)
*
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/AbstractSqlConnection.java
old mode 100644
new mode 100755
similarity index 57%
copy from
linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java
copy to
linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/AbstractSqlConnection.java
index 14bd8b5ad..66439aa39
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/AbstractSqlConnection.java
@@ -15,39 +15,29 @@
* limitations under the License.
*/
-package org.apache.linkis.metadata.query.service.db2;
+package org.apache.linkis.metadata.query.service;
-import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
-import org.apache.logging.log4j.util.Strings;
-
import java.io.Closeable;
import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
-
- private static final CommonVars<String> SQL_DRIVER_CLASS =
- CommonVars.apply("wds.linkis.server.mdm.service.db2.driver",
"com.ibm.db2.jcc.DB2Driver");
+public abstract class AbstractSqlConnection implements Closeable {
- private static final CommonVars<String> SQL_CONNECT_URL =
- CommonVars.apply("wds.linkis.server.mdm.service.db2.url",
"jdbc:db2://%s:%s/%s");
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractSqlConnection.class);
- private Connection conn;
+ public Connection conn;
- private ConnectMessage connectMessage;
+ public ConnectMessage connectMessage;
- public SqlConnection(
+ public AbstractSqlConnection(
String host,
Integer port,
String username,
@@ -55,9 +45,6 @@ public class SqlConnection implements Closeable {
String database,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
- if (Strings.isBlank(database)) {
- database = "SAMPLE";
- }
connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
conn = getDBConnection(connectMessage, database);
// Try to create statement
@@ -65,42 +52,8 @@ public class SqlConnection implements Closeable {
statement.close();
}
- public List<String> getAllDatabases() throws SQLException {
- // db2 "select schemaname from syscat.schemata"
- List<String> dataBaseName = new ArrayList<>();
- Statement stmt = null;
- ResultSet rs = null;
- try {
- stmt = conn.createStatement();
- rs = stmt.executeQuery("list database directory");
- while (rs.next()) {
- dataBaseName.add(rs.getString(1));
- }
- } finally {
- closeResource(null, stmt, rs);
- }
- return dataBaseName;
- }
-
- public List<String> getAllTables(String tabschema) throws SQLException {
- List<String> tableNames = new ArrayList<>();
- Statement stmt = null;
- ResultSet rs = null;
- try {
- stmt = conn.createStatement();
- rs =
- stmt.executeQuery(
- "select tabname as table_name from syscat.tables where tabschema
= '"
- + tabschema
- + "' and type = 'T' order by tabschema, tabname");
- while (rs.next()) {
- tableNames.add(rs.getString(1));
- }
- return tableNames;
- } finally {
- closeResource(null, stmt, rs);
- }
- }
+ public abstract Connection getDBConnection(ConnectMessage connectMessage,
String database)
+ throws ClassNotFoundException, SQLException;
public List<MetaColumnInfo> getColumns(String schemaname, String table)
throws SQLException, ClassNotFoundException {
@@ -108,7 +61,7 @@ public class SqlConnection implements Closeable {
String columnSql = "SELECT * FROM " + schemaname + "." + table + " WHERE 1
= 2";
PreparedStatement ps = null;
ResultSet rs = null;
- ResultSetMetaData meta = null;
+ ResultSetMetaData meta;
try {
List<String> primaryKeys = getPrimaryKeys(table);
ps = conn.prepareStatement(columnSql);
@@ -132,13 +85,13 @@ public class SqlConnection implements Closeable {
}
/**
- * Get primary keys
+ * Get primary keys // * @param connection connection
*
* @param table table name
* @return
* @throws SQLException
*/
- private List<String> getPrimaryKeys(String table) throws SQLException {
+ public List<String> getPrimaryKeys(String table) throws SQLException {
ResultSet rs = null;
List<String> primaryKeys = new ArrayList<>();
try {
@@ -162,7 +115,7 @@ public class SqlConnection implements Closeable {
* @param statement statement
* @param resultSet result set
*/
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
+ public void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
try {
if (null != resultSet && !resultSet.isClosed()) {
resultSet.close();
@@ -189,33 +142,18 @@ public class SqlConnection implements Closeable {
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
- throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
- Class.forName(SQL_DRIVER_CLASS.getValue());
- String url =
- String.format(
- SQL_CONNECT_URL.getValue(), connectMessage.host,
connectMessage.port, database);
- if (!connectMessage.extraParams.isEmpty()) {
- url += "?" + extraParamString;
- }
- return DriverManager.getConnection(url, connectMessage.username,
connectMessage.password);
- }
/** Connect message */
- private static class ConnectMessage {
- private String host;
+ public static class ConnectMessage {
+ public String host;
- private Integer port;
+ public Integer port;
- private String username;
+ public String username;
- private String password;
+ public String password;
- private Map<String, Object> extraParams;
+ public Map<String, Object> extraParams;
public ConnectMessage(
String host,
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/ClickhouseMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/ClickhouseMetaService.java
index 06fbe9657..9b246eabe 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/ClickhouseMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/ClickhouseMetaService.java
@@ -95,4 +95,9 @@ public class ClickhouseMetaService extends
AbstractDbMetaService<SqlConnection>
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/Db2MetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/Db2MetaService.java
index 3a405dcbf..b34e2b6f0 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/Db2MetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/Db2MetaService.java
@@ -96,4 +96,9 @@ public class Db2MetaService extends
AbstractDbMetaService<SqlConnection> {
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/DmMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/DmMetaService.java
index b825cd1a3..25a81f018 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/DmMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/DmMetaService.java
@@ -93,4 +93,9 @@ public class DmMetaService extends
AbstractDbMetaService<SqlConnection> {
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/GreenplumMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/GreenplumMetaService.java
index c5e4ebd53..f13d052b5 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/GreenplumMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/GreenplumMetaService.java
@@ -101,4 +101,9 @@ public class GreenplumMetaService extends
AbstractDbMetaService<SqlConnection> {
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/KingbaseMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/KingbaseMetaService.java
index 15dde4787..23e3aada4 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/KingbaseMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/KingbaseMetaService.java
@@ -94,4 +94,9 @@ public class KingbaseMetaService extends
AbstractDbMetaService<SqlConnection> {
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/MysqlMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/MysqlMetaService.java
index 1ccbb1668..e178101f5 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/MysqlMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/MysqlMetaService.java
@@ -94,4 +94,9 @@ public class MysqlMetaService extends
AbstractDbMetaService<SqlConnection> {
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/OracleMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/OracleMetaService.java
index dbdf22243..dc46d2192 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/OracleMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/OracleMetaService.java
@@ -109,4 +109,9 @@ public class OracleMetaService extends
AbstractDbMetaService<SqlConnection> {
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/PostgresqlMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/PostgresqlMetaService.java
index 13145eeef..95415d1f3 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/PostgresqlMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/PostgresqlMetaService.java
@@ -103,4 +103,9 @@ public class PostgresqlMetaService extends
AbstractDbMetaService<SqlConnection>
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/SqlserverMetaService.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/SqlserverMetaService.java
index 9a2001cda..5729fd710 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/SqlserverMetaService.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/SqlserverMetaService.java
@@ -89,4 +89,9 @@ public class SqlserverMetaService extends
AbstractDbMetaService<SqlConnection> {
throw new RuntimeException("Fail to get Sql columns(获取字段列表失败)", e);
}
}
+
+ @Override
+ public String querySqlConnectUrl(SqlConnection connection) {
+ return connection.getSqlConnectUrl();
+ }
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/clickhouse/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/clickhouse/SqlConnection.java
index 81cbe029b..4984b1806 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/clickhouse/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/clickhouse/SqlConnection.java
@@ -19,9 +19,10 @@ package org.apache.linkis.metadata.query.service.clickhouse;
import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
+
+import org.apache.commons.collections.MapUtils;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@@ -31,7 +32,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
@@ -48,10 +49,6 @@ public class SqlConnection implements Closeable {
private static final CommonVars<Integer> SQL_SOCKET_TIMEOUT =
CommonVars.apply("wds.linkis.server.mdm.service.sql.socket.timeout",
6000);
- private Connection conn;
-
- private ConnectMessage connectMessage;
-
public SqlConnection(
String host,
Integer port,
@@ -60,11 +57,9 @@ public class SqlConnection implements Closeable {
String database,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
- connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
- conn = getDBConnection(connectMessage, database);
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ super(host, port, username, password, database, extraParams);
+ connectMessage.extraParams.put("connectTimeout",
SQL_CONNECT_TIMEOUT.getValue());
+ connectMessage.extraParams.put("socketTimeout",
SQL_SOCKET_TIMEOUT.getValue());
}
public List<String> getAllDatabases() throws SQLException {
@@ -128,105 +123,29 @@ public class SqlConnection implements Closeable {
return columns;
}
- /**
- * Get primary keys
- *
- * @param table table name
- * @return
- * @throws SQLException
- */
- private List<String> getPrimaryKeys(String table) throws SQLException {
- ResultSet rs = null;
- List<String> primaryKeys = new ArrayList<>();
- try {
- DatabaseMetaData dbMeta = conn.getMetaData();
- rs = dbMeta.getPrimaryKeys(null, null, table);
- while (rs.next()) {
- primaryKeys.add(rs.getString("column_name"));
- }
- return primaryKeys;
- } finally {
- if (null != rs) {
- rs.close();
- }
- }
- }
-
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement && !statement.isClosed()) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url =
String.format(
SQL_CONNECT_URL.getValue(), connectMessage.host,
connectMessage.port, database);
- if (!connectMessage.extraParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
+ String extraParamString =
+ connectMessage.extraParams.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
+ .collect(Collectors.joining("&"));
url += "?" + extraParamString;
}
return DriverManager.getConnection(url, connectMessage.username,
connectMessage.password);
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- this.extraParams.put("connectTimeout", SQL_CONNECT_TIMEOUT.getValue());
- this.extraParams.put("socketTimeout", SQL_SOCKET_TIMEOUT.getValue());
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java
index 14bd8b5ad..3f61ac51f 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/db2/SqlConnection.java
@@ -18,12 +18,11 @@
package org.apache.linkis.metadata.query.service.db2;
import org.apache.linkis.common.conf.CommonVars;
-import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
+import org.apache.commons.collections.MapUtils;
import org.apache.logging.log4j.util.Strings;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@@ -33,7 +32,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
@@ -43,10 +42,6 @@ public class SqlConnection implements Closeable {
private static final CommonVars<String> SQL_CONNECT_URL =
CommonVars.apply("wds.linkis.server.mdm.service.db2.url",
"jdbc:db2://%s:%s/%s");
- private Connection conn;
-
- private ConnectMessage connectMessage;
-
public SqlConnection(
String host,
Integer port,
@@ -55,14 +50,13 @@ public class SqlConnection implements Closeable {
String database,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
- if (Strings.isBlank(database)) {
- database = "SAMPLE";
- }
- connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
- conn = getDBConnection(connectMessage, database);
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ super(
+ host,
+ port,
+ username,
+ password,
+ Strings.isBlank(database) ? "SAMPLE" : database,
+ extraParams);
}
public List<String> getAllDatabases() throws SQLException {
@@ -102,132 +96,29 @@ public class SqlConnection implements Closeable {
}
}
- public List<MetaColumnInfo> getColumns(String schemaname, String table)
- throws SQLException, ClassNotFoundException {
- List<MetaColumnInfo> columns = new ArrayList<>();
- String columnSql = "SELECT * FROM " + schemaname + "." + table + " WHERE 1
= 2";
- PreparedStatement ps = null;
- ResultSet rs = null;
- ResultSetMetaData meta = null;
- try {
- List<String> primaryKeys = getPrimaryKeys(table);
- ps = conn.prepareStatement(columnSql);
- rs = ps.executeQuery();
- meta = rs.getMetaData();
- int columnCount = meta.getColumnCount();
- for (int i = 1; i < columnCount + 1; i++) {
- MetaColumnInfo info = new MetaColumnInfo();
- info.setIndex(i);
- info.setName(meta.getColumnName(i));
- info.setType(meta.getColumnTypeName(i));
- if (primaryKeys.contains(meta.getColumnName(i))) {
- info.setPrimaryKey(true);
- }
- columns.add(info);
- }
- } finally {
- closeResource(null, ps, rs);
- }
- return columns;
- }
-
- /**
- * Get primary keys
- *
- * @param table table name
- * @return
- * @throws SQLException
- */
- private List<String> getPrimaryKeys(String table) throws SQLException {
- ResultSet rs = null;
- List<String> primaryKeys = new ArrayList<>();
- try {
- DatabaseMetaData dbMeta = conn.getMetaData();
- rs = dbMeta.getPrimaryKeys(null, null, table);
- while (rs.next()) {
- primaryKeys.add(rs.getString("column_name"));
- }
- return primaryKeys;
- } finally {
- if (null != rs) {
- rs.close();
- }
- }
- }
-
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement && !statement.isClosed()) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url =
String.format(
SQL_CONNECT_URL.getValue(), connectMessage.host,
connectMessage.port, database);
- if (!connectMessage.extraParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
+ String extraParamString =
+ connectMessage.extraParams.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
+ .collect(Collectors.joining("&"));
url += "?" + extraParamString;
}
return DriverManager.getConnection(url, connectMessage.username,
connectMessage.password);
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/dm/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/dm/SqlConnection.java
index e19dda991..32ae65b58 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/dm/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/dm/SqlConnection.java
@@ -19,11 +19,11 @@ package org.apache.linkis.metadata.query.service.dm;
import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
@@ -31,7 +31,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
@@ -41,10 +41,6 @@ public class SqlConnection implements Closeable {
private static final CommonVars<String> SQL_CONNECT_URL =
CommonVars.apply("wds.linkis.server.mdm.service.dameng.url",
"jdbc:dm://%s:%s/%s");
- private Connection conn;
-
- private ConnectMessage connectMessage;
-
public SqlConnection(
String host,
Integer port,
@@ -53,11 +49,7 @@ public class SqlConnection implements Closeable {
String database,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
- connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
- conn = getDBConnection(connectMessage, database);
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ super(host, port, username, password, database, extraParams);
}
public List<String> getAllDatabases() throws SQLException {
@@ -160,56 +152,26 @@ public class SqlConnection implements Closeable {
return columnComment;
}
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement && !statement.isClosed()) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url =
String.format(
SQL_CONNECT_URL.getValue(), connectMessage.host,
connectMessage.port, database);
- if (!connectMessage.extraParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
+ String extraParamString =
+ connectMessage.extraParams.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
+ .collect(Collectors.joining("&"));
url += "?" + extraParamString;
}
try {
- // return DriverManager.getConnection(url,
connectMessage.username,
- // connectMessage.password);
Properties prop = new Properties();
prop.put("user", connectMessage.username);
prop.put("password", connectMessage.password);
@@ -221,29 +183,7 @@ public class SqlConnection implements Closeable {
}
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/greenplum/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/greenplum/SqlConnection.java
index 4685b0679..494511a4a 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/greenplum/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/greenplum/SqlConnection.java
@@ -18,12 +18,11 @@
package org.apache.linkis.metadata.query.service.greenplum;
import org.apache.linkis.common.conf.CommonVars;
-import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
+import org.apache.commons.collections.MapUtils;
import org.apache.logging.log4j.util.Strings;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@@ -33,7 +32,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
private static final CommonVars<String> SQL_DRIVER_CLASS =
@@ -45,10 +44,6 @@ public class SqlConnection implements Closeable {
"wds.linkis.server.mdm.service.greenplum.url",
"jdbc:pivotal:greenplum://%s:%s;DatabaseName=%s");
- private Connection conn;
-
- private ConnectMessage connectMessage;
-
public SqlConnection(
String host,
Integer port,
@@ -57,14 +52,7 @@ public class SqlConnection implements Closeable {
String database,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
- connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
- if (Strings.isBlank(database)) {
- database = "";
- }
- conn = getDBConnection(connectMessage, database);
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ super(host, port, username, password, Strings.isBlank(database) ? "" :
database, extraParams);
}
public List<String> getAllDatabases() throws SQLException {
@@ -101,126 +89,29 @@ public class SqlConnection implements Closeable {
}
}
- public List<MetaColumnInfo> getColumns(String schemaname, String table)
- throws SQLException, ClassNotFoundException {
- List<MetaColumnInfo> columns = new ArrayList<>();
- String columnSql = "SELECT * FROM " + schemaname + "." + table + " WHERE 1
= 2";
- PreparedStatement ps = null;
- ResultSet rs = null;
- ResultSetMetaData meta;
- try {
- List<String> primaryKeys = getPrimaryKeys(table);
- ps = conn.prepareStatement(columnSql);
- rs = ps.executeQuery();
- meta = rs.getMetaData();
- int columnCount = meta.getColumnCount();
- for (int i = 1; i < columnCount + 1; i++) {
- MetaColumnInfo info = new MetaColumnInfo();
- info.setIndex(i);
- info.setName(meta.getColumnName(i));
- info.setType(meta.getColumnTypeName(i));
- if (primaryKeys.contains(meta.getColumnName(i))) {
- info.setPrimaryKey(true);
- }
- columns.add(info);
- }
- } finally {
- closeResource(null, ps, rs);
- }
- return columns;
- }
-
- /**
- * Get primary keys
- *
- * @param table table name
- * @return
- * @throws SQLException
- */
- private List<String> getPrimaryKeys(String table) throws SQLException {
- ResultSet rs = null;
- List<String> primaryKeys = new ArrayList<>();
- DatabaseMetaData dbMeta = conn.getMetaData();
- rs = dbMeta.getPrimaryKeys(null, null, table);
- while (rs.next()) {
- primaryKeys.add(rs.getString("column_name"));
- }
- return primaryKeys;
- }
-
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement && !statement.isClosed()) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url =
String.format(
SQL_CONNECT_URL.getValue(), connectMessage.host,
connectMessage.port, database);
- if (!connectMessage.extraParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
+ String extraParamString =
+ connectMessage.extraParams.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
+ .collect(Collectors.joining("&"));
url += "?" + extraParamString;
}
return DriverManager.getConnection(url, connectMessage.username,
connectMessage.password);
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/kingbase/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/kingbase/SqlConnection.java
index 6eba1fe3b..85fba9489 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/kingbase/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/kingbase/SqlConnection.java
@@ -19,9 +19,10 @@ package org.apache.linkis.metadata.query.service.kingbase;
import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
+
+import org.apache.commons.collections.MapUtils;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@@ -31,7 +32,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
@@ -43,10 +44,6 @@ public class SqlConnection implements Closeable {
"wds.linkis.server.mdm.service.kingbase.url",
"jdbc:kingbase8://%s:%s/%s?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8");
- private Connection conn;
-
- private ConnectMessage connectMessage;
-
public SqlConnection(
String host,
Integer port,
@@ -55,11 +52,7 @@ public class SqlConnection implements Closeable {
String database,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
- connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
- conn = getDBConnection(connectMessage, database);
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ super(host, port, username, password, database, extraParams);
}
public List<String> getAllDatabases() throws SQLException {
@@ -76,8 +69,6 @@ public class SqlConnection implements Closeable {
closeResource(null, stmt, rs);
}
return dataBaseName;
- // throw new UnsupportedOperationException("kingbase数据库不能像mysql show
- // databases来获取,应该是存在某个地方来获取的");
}
public List<String> getAllTables(String schema) throws SQLException {
@@ -104,7 +95,8 @@ public class SqlConnection implements Closeable {
public List<MetaColumnInfo> getColumns(String database, String table)
throws SQLException, ClassNotFoundException {
List<MetaColumnInfo> columns = new ArrayList<>();
- String columnSql = "SELECT * FROM " + database + "." + table + " WHERE 1 =
2";
+ String columnSql =
+ "SELECT * FROM " + String.format("\"%s\"", database) + "." + table + "
WHERE 1 = 2";
PreparedStatement ps = null;
ResultSet rs = null;
ResultSetMetaData meta = null;
@@ -130,62 +122,23 @@ public class SqlConnection implements Closeable {
return columns;
}
- private List<String> getPrimaryKeys(String table) throws SQLException {
- ResultSet rs = null;
- List<String> primaryKeys = new ArrayList<>();
- DatabaseMetaData dbMeta = conn.getMetaData();
- rs = dbMeta.getPrimaryKeys(null, null, table);
- while (rs.next()) {
- primaryKeys.add(rs.getString("column_name"));
- }
- return primaryKeys;
- }
-
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement && !statement.isClosed()) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url =
String.format(
SQL_CONNECT_URL.getValue(), connectMessage.host,
connectMessage.port, database);
- if (!connectMessage.extraParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
+ String extraParamString =
+ connectMessage.extraParams.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
+ .collect(Collectors.joining("&"));
url += "?" + extraParamString;
}
try {
@@ -196,29 +149,7 @@ public class SqlConnection implements Closeable {
}
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java
index 8c081e61f..167023e3f 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/mysql/SqlConnection.java
@@ -20,11 +20,11 @@ package org.apache.linkis.metadata.query.service.mysql;
import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.common.utils.SecurityUtils;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
@@ -32,7 +32,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
@@ -48,10 +48,6 @@ public class SqlConnection implements Closeable {
private static final CommonVars<Integer> SQL_SOCKET_TIMEOUT =
CommonVars.apply("wds.linkis.server.mdm.service.sql.socket.timeout",
6000);
- private Connection conn;
-
- private ConnectMessage connectMessage;
-
public SqlConnection(
String host,
Integer port,
@@ -60,16 +56,12 @@ public class SqlConnection implements Closeable {
String database,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
+ super(host, port, username, password, database, extraParams);
// security check
SecurityUtils.checkJdbcConnParams(host, port, username, password,
database, extraParams);
SecurityUtils.appendMysqlForceParams(extraParams);
-
- connectMessage =
- new ConnectMessage(host.trim(), port, username.trim(),
password.trim(), extraParams);
- conn = getDBConnection(connectMessage, database.trim());
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ connectMessage.extraParams.put("connectTimeout",
SQL_CONNECT_TIMEOUT.getValue());
+ connectMessage.extraParams.put("socketTimeout",
SQL_SOCKET_TIMEOUT.getValue());
}
public List<String> getAllDatabases() throws SQLException {
@@ -133,70 +125,14 @@ public class SqlConnection implements Closeable {
return columns;
}
- /**
- * Get primary keys
- *
- * @param table table name
- * @return
- * @throws SQLException
- */
- private List<String> getPrimaryKeys(String table) throws SQLException {
- ResultSet rs = null;
- List<String> primaryKeys = new ArrayList<>();
- try {
- DatabaseMetaData dbMeta = conn.getMetaData();
- rs = dbMeta.getPrimaryKeys(null, null, table);
- while (rs.next()) {
- primaryKeys.add(rs.getString("column_name"));
- }
- return primaryKeys;
- } finally {
- if (null != rs) {
- rs.close();
- }
- }
- }
-
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement && !statement.isClosed()) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url =
String.format(
@@ -205,38 +141,18 @@ public class SqlConnection implements Closeable {
if (StringUtils.isBlank(database)) {
url = url.substring(0, url.length() - 1);
}
- if (!connectMessage.extraParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
+ String extraParamString =
+ connectMessage.extraParams.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
+ .collect(Collectors.joining("&"));
url += "?" + extraParamString;
}
LOG.info("jdbc connection url: {}", url);
return DriverManager.getConnection(url, connectMessage.username,
connectMessage.password);
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- this.extraParams.put("connectTimeout", SQL_CONNECT_TIMEOUT.getValue());
- this.extraParams.put("socketTimeout", SQL_SOCKET_TIMEOUT.getValue());
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/oracle/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/oracle/SqlConnection.java
index 6a99a043e..d40008ff6 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/oracle/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/oracle/SqlConnection.java
@@ -19,11 +19,11 @@ package org.apache.linkis.metadata.query.service.oracle;
import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
@@ -31,7 +31,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
private static final CommonVars<String> SQL_DRIVER_CLASS =
@@ -46,9 +46,7 @@ public class SqlConnection implements Closeable {
CommonVars.apply(
"wds.linkis.server.mdm.service.oracle.service.url",
"jdbc:oracle:thin:@//%s:%s/%s");
- private Connection conn;
-
- private ConnectMessage connectMessage;
+ private String serviceName;
public SqlConnection(
String host,
@@ -59,11 +57,8 @@ public class SqlConnection implements Closeable {
String serviceName,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
- connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
- conn = getDBConnection(connectMessage, database, serviceName);
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ super(host, port, username, password, database, extraParams);
+ this.serviceName = serviceName;
}
public List<String> getAllDatabases() throws SQLException {
@@ -102,6 +97,12 @@ public class SqlConnection implements Closeable {
}
}
+ @Override
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ throws ClassNotFoundException, SQLException {
+ return getDBConnection(connectMessage, database, serviceName);
+ }
+
public List<MetaColumnInfo> getColumns(String schemaname, String table)
throws SQLException, ClassNotFoundException {
List<MetaColumnInfo> columns = new ArrayList<>();
@@ -140,23 +141,6 @@ public class SqlConnection implements Closeable {
return columns;
}
- /**
- * Get primary keys // * @param connection connection
- *
- * @param table table name
- * @return
- * @throws SQLException
- */
- private List<String> getPrimaryKeys(String table) throws SQLException {
- ResultSet rs = null;
- List<String> primaryKeys = new ArrayList<>();
- DatabaseMetaData dbMeta = conn.getMetaData();
- rs = dbMeta.getPrimaryKeys(null, null, table);
- while (rs.next()) {
- primaryKeys.add(rs.getString("column_name"));
- }
- return primaryKeys;
- }
/**
* Get Column Comment
*
@@ -176,34 +160,6 @@ public class SqlConnection implements Closeable {
return columnComment;
}
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement && !statement.isClosed()) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
@@ -213,10 +169,6 @@ public class SqlConnection implements Closeable {
private Connection getDBConnection(
ConnectMessage connectMessage, String database, String serviceName)
throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url = "";
if (StringUtils.isNotBlank(database)) {
@@ -232,7 +184,11 @@ public class SqlConnection implements Closeable {
database);
}
- if (!connectMessage.extraParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
+ String extraParamString =
+ connectMessage.extraParams.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
+ .collect(Collectors.joining("&"));
url += "?" + extraParamString;
}
Properties prop = new Properties();
@@ -242,29 +198,7 @@ public class SqlConnection implements Closeable {
return DriverManager.getConnection(url, prop);
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/postgres/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/postgres/SqlConnection.java
index 02acd76a9..8dca1f218 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/postgres/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/postgres/SqlConnection.java
@@ -18,13 +18,11 @@
package org.apache.linkis.metadata.query.service.postgres;
import org.apache.linkis.common.conf.CommonVars;
-import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
import org.apache.commons.collections.MapUtils;
import org.apache.logging.log4j.util.Strings;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@@ -34,7 +32,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
private static final CommonVars<String> SQL_DRIVER_CLASS =
@@ -43,10 +41,6 @@ public class SqlConnection implements Closeable {
private static final CommonVars<String> SQL_CONNECT_URL =
CommonVars.apply("wds.linkis.server.mdm.service.postgre.url",
"jdbc:postgresql://%s:%s/%s");
- private Connection conn;
-
- private ConnectMessage connectMessage;
-
public SqlConnection(
String host,
Integer port,
@@ -55,14 +49,7 @@ public class SqlConnection implements Closeable {
String database,
Map<String, Object> extraParams)
throws ClassNotFoundException, SQLException {
- connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
- if (Strings.isBlank(database)) {
- database = "";
- }
- conn = getDBConnection(connectMessage, database);
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ super(host, port, username, password, Strings.isBlank(database) ? "" :
database, extraParams);
}
public List<String> getAllDatabases() throws SQLException {
@@ -99,93 +86,19 @@ public class SqlConnection implements Closeable {
}
}
- public List<MetaColumnInfo> getColumns(String schemaname, String table)
- throws SQLException, ClassNotFoundException {
- List<MetaColumnInfo> columns = new ArrayList<>();
- String columnSql = "SELECT * FROM " + schemaname + "." + table + " WHERE 1
= 2";
- PreparedStatement ps = null;
- ResultSet rs = null;
- ResultSetMetaData meta;
- try {
- List<String> primaryKeys = getPrimaryKeys(table);
- ps = conn.prepareStatement(columnSql);
- rs = ps.executeQuery();
- meta = rs.getMetaData();
- int columnCount = meta.getColumnCount();
- for (int i = 1; i < columnCount + 1; i++) {
- MetaColumnInfo info = new MetaColumnInfo();
- info.setIndex(i);
- info.setName(meta.getColumnName(i));
- info.setType(meta.getColumnTypeName(i));
- if (primaryKeys.contains(meta.getColumnName(i))) {
- info.setPrimaryKey(true);
- }
- columns.add(info);
- }
- } finally {
- closeResource(null, ps, rs);
- }
- return columns;
- }
-
- /**
- * Get primary keys // * @param connection connection
- *
- * @param table table name
- * @return
- * @throws SQLException
- */
- private List<String> getPrimaryKeys(String table) throws SQLException {
- ResultSet rs = null;
- List<String> primaryKeys = new ArrayList<>();
- DatabaseMetaData dbMeta = conn.getMetaData();
- rs = dbMeta.getPrimaryKeys(null, null, table);
- while (rs.next()) {
- primaryKeys.add(rs.getString("column_name"));
- }
- return primaryKeys;
- }
-
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement && !statement.isClosed()) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
throws ClassNotFoundException, SQLException {
Class.forName(SQL_DRIVER_CLASS.getValue());
String url =
String.format(
SQL_CONNECT_URL.getValue(), connectMessage.host,
connectMessage.port, database);
+
if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
String extraParamString =
connectMessage.extraParams.entrySet().stream()
@@ -196,29 +109,7 @@ public class SqlConnection implements Closeable {
return DriverManager.getConnection(url, connectMessage.username,
connectMessage.password);
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
diff --git
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/sqlserver/SqlConnection.java
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/sqlserver/SqlConnection.java
index 0d3597380..c963b2358 100644
---
a/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/sqlserver/SqlConnection.java
+++
b/linkis-public-enhancements/linkis-datasource/linkis-metadata-query/service/jdbc/src/main/java/org/apache/linkis/metadata/query/service/sqlserver/SqlConnection.java
@@ -19,9 +19,10 @@ package org.apache.linkis.metadata.query.service.sqlserver;
import org.apache.linkis.common.conf.CommonVars;
import org.apache.linkis.metadata.query.common.domain.MetaColumnInfo;
+import org.apache.linkis.metadata.query.service.AbstractSqlConnection;
+
+import org.apache.commons.collections.MapUtils;
-import java.io.Closeable;
-import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
@@ -31,7 +32,7 @@ import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SqlConnection implements Closeable {
+public class SqlConnection extends AbstractSqlConnection {
private static final Logger LOG =
LoggerFactory.getLogger(SqlConnection.class);
private static final CommonVars<String> SQL_DRIVER_CLASS =
@@ -43,18 +44,10 @@ public class SqlConnection implements Closeable {
CommonVars.apply(
"wds.linkis.server.mdm.service.sqlserver.url",
"jdbc:sqlserver://%s:%s;DataBaseName=%s");
- private Connection conn;
-
- private ConnectMessage connectMessage;
-
public SqlConnection(
String host, Integer port, String username, String password, Map<String,
Object> extraParams)
throws ClassNotFoundException, SQLException {
- connectMessage = new ConnectMessage(host, port, username, password,
extraParams);
- conn = getDBConnection(connectMessage, "");
- // Try to create statement
- Statement statement = conn.createStatement();
- statement.close();
+ super(host, port, username, password, "", extraParams);
}
public List<String> getAllDatabases() throws SQLException {
@@ -136,103 +129,29 @@ public class SqlConnection implements Closeable {
return columns;
}
- /**
- * Get primary keys
- *
- * @param table table name
- * @return
- * @throws SQLException
- */
- private List<String> getPrimaryKeys(String table) throws SQLException {
- ResultSet rs = null;
- List<String> primaryKeys = new ArrayList<>();
- try {
- DatabaseMetaData dbMeta = conn.getMetaData();
- rs = dbMeta.getPrimaryKeys(null, null, table);
- while (rs.next()) {
- primaryKeys.add(rs.getString("column_name"));
- }
- return primaryKeys;
- } finally {
- if (null != rs) {
- closeResource(null, null, rs);
- }
- }
- }
-
- /**
- * close database resource
- *
- * @param connection connection
- * @param statement statement
- * @param resultSet result set
- */
- private void closeResource(Connection connection, Statement statement,
ResultSet resultSet) {
- try {
- if (null != resultSet && !resultSet.isClosed()) {
- resultSet.close();
- }
- if (null != statement /*&& !statement.isClosed()*/) {
- statement.close();
- }
- if (null != connection && !connection.isClosed()) {
- connection.close();
- }
- } catch (SQLException e) {
- LOG.warn("Fail to release resource [" + e.getMessage() + "]", e);
- }
- }
-
- @Override
- public void close() throws IOException {
- closeResource(conn, null, null);
- }
-
/**
* @param connectMessage
* @param database
* @return
* @throws ClassNotFoundException
*/
- private Connection getDBConnection(ConnectMessage connectMessage, String
database)
+ public Connection getDBConnection(ConnectMessage connectMessage, String
database)
throws ClassNotFoundException, SQLException {
- String extraParamString =
- connectMessage.extraParams.entrySet().stream()
- .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
- .collect(Collectors.joining("&"));
Class.forName(SQL_DRIVER_CLASS.getValue());
String url =
String.format(
SQL_CONNECT_URL.getValue(), connectMessage.host,
connectMessage.port, database);
- if (!connectMessage.extraParams.isEmpty()) {
+ if (MapUtils.isNotEmpty(connectMessage.extraParams)) {
+ String extraParamString =
+ connectMessage.extraParams.entrySet().stream()
+ .map(e -> String.join("=", e.getKey(),
String.valueOf(e.getValue())))
+ .collect(Collectors.joining("&"));
url += "?" + extraParamString;
}
return DriverManager.getConnection(url, connectMessage.username,
connectMessage.password);
}
- /** Connect message */
- private static class ConnectMessage {
- private String host;
-
- private Integer port;
-
- private String username;
-
- private String password;
-
- private Map<String, Object> extraParams;
-
- public ConnectMessage(
- String host,
- Integer port,
- String username,
- String password,
- Map<String, Object> extraParams) {
- this.host = host;
- this.port = port;
- this.username = username;
- this.password = password;
- this.extraParams = extraParams;
- }
+ public String getSqlConnectUrl() {
+ return SQL_CONNECT_URL.getValue();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]