This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new dc28f65a3d0 [pick-2.0][Enhancement](jdbc catalog) Add a property to
test the connection when creating a Jdbc catalog (#32581)
dc28f65a3d0 is described below
commit dc28f65a3d0cc80834b58534155dfb460a4d25fe
Author: zy-kkk <[email protected]>
AuthorDate: Thu Mar 21 14:05:50 2024 +0800
[pick-2.0][Enhancement](jdbc catalog) Add a property to test the connection
when creating a Jdbc catalog (#32581)
---
be/src/service/internal_service.cpp | 59 +++++++++++
be/src/service/internal_service.h | 5 +
be/src/vec/exec/vjdbc_connector.cpp | 27 ++++-
be/src/vec/exec/vjdbc_connector.h | 5 +
.../java/org/apache/doris/jdbc/JdbcExecutor.java | 21 ++++
.../org/apache/doris/catalog/JdbcResource.java | 9 +-
.../apache/doris/datasource/CatalogFactory.java | 2 +-
.../doris/datasource/jdbc/JdbcExternalCatalog.java | 111 ++++++++++++++++++++-
.../doris/datasource/jdbc/client/JdbcClient.java | 18 ++++
.../datasource/jdbc/client/JdbcOracleClient.java | 5 +
.../datasource/jdbc/client/JdbcSapHanaClient.java | 5 +
.../org/apache/doris/rpc/BackendServiceClient.java | 5 +
.../org/apache/doris/rpc/BackendServiceProxy.java | 12 +++
.../datasource/jdbc/JdbcExternalCatalogTest.java | 2 +-
gensrc/proto/internal_service.proto | 11 ++
.../jdbc/test_clickhouse_jdbc_catalog.out | Bin 5002 -> 5437 bytes
.../jdbc/test_clickhouse_jdbc_catalog.groovy | 30 +++++-
17 files changed, 320 insertions(+), 7 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index acc2e29a3ef..2fd4310c4bc 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -37,6 +37,7 @@
#include <stddef.h>
#include <stdint.h>
#include <sys/stat.h>
+#include <vec/exec/vjdbc_connector.h>
#include <algorithm>
#include <exception>
@@ -683,6 +684,64 @@ void
PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co
}
}
+void
PInternalServiceImpl::test_jdbc_connection(google::protobuf::RpcController*
controller,
+ const
PJdbcTestConnectionRequest* request,
+ PJdbcTestConnectionResult*
result,
+ google::protobuf::Closure*
done) {
+ bool ret = _heavy_work_pool.try_offer([request, result, done]() {
+ VLOG_RPC << "test jdbc connection";
+ brpc::ClosureGuard closure_guard(done);
+ TTableDescriptor table_desc;
+ vectorized::JdbcConnectorParam jdbc_param;
+ Status st = Status::OK();
+ {
+ const uint8_t* buf = (const uint8_t*)request->jdbc_table().data();
+ uint32_t len = request->jdbc_table().size();
+ st = deserialize_thrift_msg(buf, &len, false, &table_desc);
+ if (!st.ok()) {
+ LOG(WARNING) << "test jdbc connection failed, errmsg=" << st;
+ st.to_protobuf(result->mutable_status());
+ return;
+ }
+ }
+ TJdbcTable jdbc_table = (table_desc.jdbcTable);
+ jdbc_param.catalog_id = jdbc_table.catalog_id;
+ jdbc_param.driver_class = jdbc_table.jdbc_driver_class;
+ jdbc_param.driver_path = jdbc_table.jdbc_driver_url;
+ jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum;
+ jdbc_param.jdbc_url = jdbc_table.jdbc_url;
+ jdbc_param.user = jdbc_table.jdbc_user;
+ jdbc_param.passwd = jdbc_table.jdbc_password;
+ jdbc_param.query_string = request->query_str();
+ jdbc_param.table_type =
static_cast<TOdbcTableType::type>(request->jdbc_table_type());
+ jdbc_param.connection_pool_min_size =
jdbc_table.connection_pool_min_size;
+ jdbc_param.connection_pool_max_size =
jdbc_table.connection_pool_max_size;
+ jdbc_param.connection_pool_max_life_time =
jdbc_table.connection_pool_max_life_time;
+ jdbc_param.connection_pool_max_wait_time =
jdbc_table.connection_pool_max_wait_time;
+ jdbc_param.connection_pool_keep_alive =
jdbc_table.connection_pool_keep_alive;
+
+ std::unique_ptr<vectorized::JdbcConnector> jdbc_connector;
+ jdbc_connector.reset(new (std::nothrow)
vectorized::JdbcConnector(jdbc_param));
+
+ st = jdbc_connector->test_connection();
+ st.to_protobuf(result->mutable_status());
+
+ Status clean_st = jdbc_connector->clean_datasource();
+ if (!clean_st.ok()) {
+ LOG(WARNING) << "Failed to clean JDBC datasource: " <<
clean_st.msg();
+ }
+ Status close_st = jdbc_connector->close();
+ if (!close_st.ok()) {
+ LOG(WARNING) << "Failed to close JDBC connector: " <<
close_st.msg();
+ }
+ });
+
+ if (!ret) {
+ offer_failed(result, done, _heavy_work_pool);
+ return;
+ }
+}
+
void
PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController*
controller,
const
PFetchColIdsRequest* request,
PFetchColIdsResponse*
response,
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 47762cf7e52..259a41c2e33 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -184,6 +184,11 @@ public:
void glob(google::protobuf::RpcController* controller, const PGlobRequest*
request,
PGlobResponse* response, google::protobuf::Closure* done)
override;
+ void test_jdbc_connection(google::protobuf::RpcController* controller,
+ const PJdbcTestConnectionRequest* request,
+ PJdbcTestConnectionResult* result,
+ google::protobuf::Closure* done) override;
+
private:
void _exec_plan_fragment_in_pthread(google::protobuf::RpcController*
controller,
const PExecPlanFragmentRequest*
request,
diff --git a/be/src/vec/exec/vjdbc_connector.cpp
b/be/src/vec/exec/vjdbc_connector.cpp
index 9b98627db94..9344faad01d 100644
--- a/be/src/vec/exec/vjdbc_connector.cpp
+++ b/be/src/vec/exec/vjdbc_connector.cpp
@@ -156,7 +156,11 @@ Status JdbcConnector::open(RuntimeState* state, bool read)
{
ctor_params.__set_jdbc_password(_conn_param.passwd);
ctor_params.__set_jdbc_driver_class(_conn_param.driver_class);
ctor_params.__set_driver_path(local_location);
- ctor_params.__set_batch_size(read ? state->batch_size() : 0);
+ if (state == nullptr) {
+ ctor_params.__set_batch_size(read ? 1 : 0);
+ } else {
+ ctor_params.__set_batch_size(read ? state->batch_size() : 0);
+ }
ctor_params.__set_op(read ? TJdbcOperation::READ :
TJdbcOperation::WRITE);
ctor_params.__set_table_type(_conn_param.table_type);
ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size);
@@ -185,6 +189,23 @@ Status JdbcConnector::open(RuntimeState* state, bool read)
{
return Status::OK();
}
+Status JdbcConnector::test_connection() {
+ RETURN_IF_ERROR(open(nullptr, true));
+
+ JNIEnv* env = nullptr;
+ RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_test_connection_id);
+ return JniUtil::GetJniExceptionMsg(env);
+}
+
+Status JdbcConnector::clean_datasource() {
+ JNIEnv* env = nullptr;
+ RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+ env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz,
_executor_clean_datasource_id);
+ return JniUtil::GetJniExceptionMsg(env);
+}
+
Status JdbcConnector::query() {
if (!_is_open) {
return Status::InternalError("Query before open of JdbcConnector.");
@@ -840,6 +861,10 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) {
JDBC_EXECUTOR_TRANSACTION_SIGNATURE,
_executor_abort_trans_id));
RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames",
JDBC_EXECUTOR_GET_TYPES_SIGNATURE,
_executor_get_types_id));
+ RETURN_IF_ERROR(
+ register_id(_executor_clazz, "testConnection", "()V",
_executor_test_connection_id));
+ RETURN_IF_ERROR(
+ register_id(_executor_clazz, "cleanDataSource", "()V",
_executor_clean_datasource_id));
return Status::OK();
}
diff --git a/be/src/vec/exec/vjdbc_connector.h
b/be/src/vec/exec/vjdbc_connector.h
index 55b39f0aec2..32dc5aba47b 100644
--- a/be/src/vec/exec/vjdbc_connector.h
+++ b/be/src/vec/exec/vjdbc_connector.h
@@ -103,6 +103,9 @@ public:
Status close() override;
+ Status test_connection();
+ Status clean_datasource();
+
private:
Status _register_func_id(JNIEnv* env);
Status _check_column_type();
@@ -165,6 +168,8 @@ private:
jmethodID _executor_begin_trans_id;
jmethodID _executor_finish_trans_id;
jmethodID _executor_abort_trans_id;
+ jmethodID _executor_test_connection_id;
+ jmethodID _executor_clean_datasource_id;
std::map<int, int> _map_column_idx_to_cast_idx;
std::vector<DataTypePtr> _input_array_string_types;
std::vector<MutableColumnPtr>
diff --git
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
index 3011c5d82f7..6f15600eddc 100644
---
a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
+++
b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java
@@ -180,6 +180,27 @@ public class JdbcExecutor {
return false;
}
+
+ public void cleanDataSource() {
+ if (druidDataSource != null) {
+ druidDataSource.close();
+
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
+ druidDataSource = null;
+ }
+ }
+
+ public void testConnection() throws UdfRuntimeException {
+ try {
+ resultSet = ((PreparedStatement) stmt).executeQuery();
+ if (!resultSet.next()) {
+ throw new UdfRuntimeException(
+ "Failed to test connection in BE: query executed but
returned no results.");
+ }
+ } catch (SQLException e) {
+ throw new UdfRuntimeException("Failed to test connection in BE: ",
e);
+ }
+ }
+
public int read() throws UdfRuntimeException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
index 3291b64cbb0..1faf27e1040 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java
@@ -104,6 +104,8 @@ public class JdbcResource extends Resource {
public static final String CONNECTION_POOL_KEEP_ALIVE =
"connection_pool_keep_alive";
public static final String CHECK_SUM = "checksum";
public static final String CREATE_TIME = "create_time";
+ public static final String TEST_CONNECTION = "test_connection";
+
private static final ImmutableList<String> ALL_PROPERTIES = new
ImmutableList.Builder<String>().add(
JDBC_URL,
USER,
@@ -120,7 +122,8 @@ public class JdbcResource extends Resource {
CONNECTION_POOL_MAX_SIZE,
CONNECTION_POOL_MAX_LIFE_TIME,
CONNECTION_POOL_MAX_WAIT_TIME,
- CONNECTION_POOL_KEEP_ALIVE
+ CONNECTION_POOL_KEEP_ALIVE,
+ TEST_CONNECTION
).build();
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new
ImmutableList.Builder<String>().add(
ONLY_SPECIFIED_DATABASE,
@@ -131,7 +134,8 @@ public class JdbcResource extends Resource {
CONNECTION_POOL_MAX_SIZE,
CONNECTION_POOL_MAX_LIFE_TIME,
CONNECTION_POOL_MAX_WAIT_TIME,
- CONNECTION_POOL_KEEP_ALIVE
+ CONNECTION_POOL_KEEP_ALIVE,
+ TEST_CONNECTION
).build();
// The default value of optional properties
@@ -148,6 +152,7 @@ public class JdbcResource extends Resource {
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME,
"1800000");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME,
"5000");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE,
"false");
+ OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "true");
}
// timeout for both connection and read. 10 seconds is long enough.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 09ad69ec8b6..3bf674a8abd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -119,7 +119,7 @@ public class CatalogFactory {
catalog = new EsExternalCatalog(catalogId, name, resource,
props, comment);
break;
case "jdbc":
- catalog = new JdbcExternalCatalog(catalogId, name, resource,
props, comment);
+ catalog = new JdbcExternalCatalog(catalogId, name, resource,
props, comment, isReplay);
break;
case "iceberg":
catalog =
IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props,
comment);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
index ea9d5d74999..774ccd2dd25 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java
@@ -17,10 +17,14 @@
package org.apache.doris.datasource.jdbc;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
+import org.apache.doris.catalog.JdbcTable;
+import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.external.JdbcExternalDatabase;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
@@ -28,19 +32,37 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
+import org.apache.doris.datasource.jdbc.client.JdbcClientException;
+import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
+import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
+import org.apache.doris.rpc.BackendServiceProxy;
+import org.apache.doris.rpc.RpcException;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
@Getter
public class JdbcExternalCatalog extends ExternalCatalog {
+ private static final Logger LOG =
LogManager.getLogger(JdbcExternalCatalog.class);
+
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
JdbcResource.JDBC_URL,
JdbcResource.DRIVER_URL,
@@ -52,10 +74,11 @@ public class JdbcExternalCatalog extends ExternalCatalog {
private transient JdbcClient jdbcClient;
public JdbcExternalCatalog(long catalogId, String name, String resource,
Map<String, String> props,
- String comment)
+ String comment, boolean isReplay)
throws DdlException {
super(catalogId, name, InitCatalogLog.Type.JDBC, comment);
this.catalogProperty = new CatalogProperty(resource,
processCompatibleProperties(props));
+ testJdbcConnection(isReplay);
}
@Override
@@ -73,6 +96,9 @@ public class JdbcExternalCatalog extends ExternalCatalog {
JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE,
getOnlySpecifiedDatabase());
JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_TABLE_NAMES,
getLowerCaseTableNames());
+
JdbcResource.checkBooleanProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE,
+ String.valueOf(isConnectionPoolKeepAlive()));
+ JdbcResource.checkBooleanProperty(JdbcResource.TEST_CONNECTION,
String.valueOf(isTestConnection()));
JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(),
getIncludeDatabaseMap(),
getExcludeDatabaseMap());
JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(),
getConnectionPoolMaxSize(),
@@ -177,6 +203,11 @@ public class JdbcExternalCatalog extends ExternalCatalog {
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE)));
}
+ public boolean isTestConnection() {
+ return
Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.TEST_CONNECTION,
JdbcResource
+ .getDefaultPropertyValue(JdbcResource.TEST_CONNECTION)));
+ }
+
@Override
protected void initLocalObjectsImpl() {
JdbcClientConfig jdbcClientConfig = new JdbcClientConfig()
@@ -244,4 +275,82 @@ public class JdbcExternalCatalog extends ExternalCatalog {
}
}
}
+
+ private void testJdbcConnection(boolean isReplay) throws DdlException {
+ if (FeConstants.runningUnitTest) {
+ // skip test connection in unit test
+ return;
+ }
+ if (!isReplay) {
+ if (isTestConnection()) {
+ try {
+ initLocalObjectsImpl();
+ testFeToJdbcConnection();
+ testBeToJdbcConnection();
+ } finally {
+ jdbcClient.closeClient();
+ jdbcClient = null;
+ }
+ }
+ }
+ }
+
+ private void testFeToJdbcConnection() throws DdlException {
+ try {
+ jdbcClient.testConnection();
+ } catch (JdbcClientException e) {
+ String errorMessage = "Test FE Connection to JDBC Failed: " +
e.getMessage();
+ LOG.error(errorMessage, e);
+ throw new DdlException(errorMessage, e);
+ }
+ }
+
+ private void testBeToJdbcConnection() throws DdlException {
+ Backend aliveBe = null;
+ for (Backend be :
Env.getCurrentSystemInfo().getIdToBackend().values()) {
+ if (be.isAlive()) {
+ aliveBe = be;
+ }
+ }
+ if (aliveBe == null) {
+ throw new DdlException("Test BE Connection to JDBC Failed: No
Alive backends");
+ }
+ TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(),
aliveBe.getBrpcPort());
+ try {
+ JdbcTable jdbcTable = getTestConnectionJdbcTable();
+ PJdbcTestConnectionRequest request =
InternalService.PJdbcTestConnectionRequest.newBuilder()
+ .setJdbcTable(ByteString.copyFrom(new
TSerializer().serialize(jdbcTable.toThrift())))
+ .setJdbcTableType(jdbcTable.getJdbcTableType().getValue())
+ .setQueryStr(jdbcClient.getTestQuery()).build();
+ InternalService.PJdbcTestConnectionResult result = null;
+ Future<PJdbcTestConnectionResult> future =
BackendServiceProxy.getInstance()
+ .testJdbcConnection(address, request);
+ result = future.get();
+ TStatusCode code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code != TStatusCode.OK) {
+ throw new DdlException("Test BE Connection to JDBC Failed: " +
result.getStatus().getErrorMsgs(0));
+ }
+ } catch (TException | RpcException | ExecutionException |
InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private JdbcTable getTestConnectionJdbcTable() throws DdlException {
+ JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection",
Lists.newArrayList(),
+ TableType.JDBC_EXTERNAL_TABLE);
+ jdbcTable.setCatalogId(this.getId());
+ jdbcTable.setJdbcTypeName(this.getDatabaseTypeName());
+ jdbcTable.setJdbcUrl(this.getJdbcUrl());
+ jdbcTable.setJdbcUser(this.getJdbcUser());
+ jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
+ jdbcTable.setDriverClass(this.getDriverClass());
+ jdbcTable.setDriverUrl(this.getDriverUrl());
+
jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl()));
+ jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
+ jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
+
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
+
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
+ jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());
+ return jdbcTable;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
index 7840e4cf0ae..1db27ef6eaf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java
@@ -478,4 +478,22 @@ public abstract class JdbcClient {
}
return ScalarType.createStringType();
}
+
+ public void testConnection() {
+ String testQuery = getTestQuery();
+ try (Connection conn = getConnection();
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(testQuery)) {
+ if (!rs.next()) {
+ throw new JdbcClientException(
+ "Failed to test connection in FE: query executed but
returned no results.");
+ }
+ } catch (SQLException e) {
+ throw new JdbcClientException("Failed to test connection in FE: "
+ e.getMessage(), e);
+ }
+ }
+
+ public String getTestQuery() {
+ return "select 1";
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
index d0a9f2c3de7..37fd1b6c72c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java
@@ -45,6 +45,11 @@ public class JdbcOracleClient extends JdbcClient {
return conn.getCatalog();
}
+ @Override
+ public String getTestQuery() {
+ return "SELECT 1 FROM dual";
+ }
+
@Override
public List<String> getDatabaseNameList() {
Connection conn = getConnection();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java
index eb8742c6e80..2df36b4cbaa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java
@@ -36,6 +36,11 @@ public class JdbcSapHanaClient extends JdbcClient {
return new String[] {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW",
"HIERARCHY VIEW", "CALC VIEW"};
}
+ @Override
+ public String getTestQuery() {
+ return "SELECT 1 FROM DUMMY";
+ }
+
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String hanaType = fieldSchema.getDataTypeName();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index e4ece51146b..87321efb85b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -114,6 +114,11 @@ public class BackendServiceClient {
return stub.fetchTableSchema(request);
}
+ public Future<InternalService.PJdbcTestConnectionResult>
testJdbcConnection(
+ InternalService.PJdbcTestConnectionRequest request) {
+ return stub.testJdbcConnection(request);
+ }
+
public Future<InternalService.PCacheResponse>
updateCache(InternalService.PUpdateCacheRequest request) {
return stub.updateCache(request);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 9b5c491df69..72afa75ffcc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -284,6 +284,18 @@ public class BackendServiceProxy {
}
}
+ public Future<InternalService.PJdbcTestConnectionResult>
testJdbcConnection(
+ TNetworkAddress address,
InternalService.PJdbcTestConnectionRequest request) throws RpcException {
+ try {
+ final BackendServiceClient client = getProxy(address);
+ return client.testJdbcConnection(request);
+ } catch (Throwable e) {
+ LOG.warn("test jdbc connection catch a exception, address={}:{}",
+ address.getHostname(), address.getPort(), e);
+ throw new RpcException(address.hostname, e.getMessage());
+ }
+ }
+
public Future<InternalService.PCacheResponse> updateCache(
TNetworkAddress address, InternalService.PUpdateCacheRequest
request) throws RpcException {
try {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
index 8394daf0682..4ef950ef981 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalogTest.java
@@ -41,7 +41,7 @@ public class JdbcExternalCatalogTest {
properties.put(JdbcResource.DRIVER_URL, "ojdbc8.jar");
properties.put(JdbcResource.JDBC_URL,
"jdbc:oracle:thin:@127.0.0.1:1521:XE");
properties.put(JdbcResource.DRIVER_CLASS,
"oracle.jdbc.driver.OracleDriver");
- jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null,
properties, "testComment");
+ jdbcExternalCatalog = new JdbcExternalCatalog(1L, "testCatalog", null,
properties, "testComment", false);
}
@Test
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 1ef97ec2df3..a1fd0e42a70 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -614,6 +614,16 @@ message PFetchTableSchemaResult {
repeated PTypeDesc column_types = 4;
}
+message PJdbcTestConnectionRequest {
+ optional bytes jdbc_table = 1;
+ optional int32 jdbc_table_type = 2;
+ optional string query_str = 3;
+}
+
+message PJdbcTestConnectionResult {
+ optional PStatus status = 1;
+}
+
message PRowLocation {
optional int64 tablet_id = 1;
optional string rowset_id = 2;
@@ -727,5 +737,6 @@ service PBackendService {
rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns
(PFetchColIdsResponse);
rpc get_tablet_rowset_versions(PGetTabletVersionsRequest) returns
(PGetTabletVersionsResponse);
rpc glob(PGlobRequest) returns (PGlobResponse);
+ rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns
(PJdbcTestConnectionResult);
};
diff --git
a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out
b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out
index 32c74765aab..65b53290116 100644
Binary files
a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out
and
b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out
differ
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy
b/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy
index 9948c49d24a..ff89e1b9a30 100644
---
a/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy
+++
b/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy
@@ -101,6 +101,32 @@ suite("test_clickhouse_jdbc_catalog",
"p0,external,clickhouse,external_docker,ex
order_qt_dt_with_tz """ select * from dt_with_tz order by id; """
+ sql """create catalog if not exists
clickhouse_catalog_test_conn_correct properties(
+ "type"="jdbc",
+ "user"="default",
+ "password"="123456",
+ "jdbc_url" =
"jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test",
+ "driver_url" = "${driver_url}",
+ "driver_class" =
"com.clickhouse.jdbc.ClickHouseDriver",
+ "test_connection" = "true"
+ );
+ """
+ order_qt_test_conn_correct """ select * from
clickhouse_catalog_test_conn_correct.doris_test.type; """
+
+ test {
+ sql """create catalog if not exists
clickhouse_catalog_test_conn_mistake properties(
+ "type"="jdbc",
+ "user"="default",
+ "password"="1234567",
+ "jdbc_url" =
"jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test",
+ "driver_url" = "${driver_url}",
+ "driver_class" =
"com.clickhouse.jdbc.ClickHouseDriver",
+ "test_connection" = "true"
+ );
+ """
+ exception "Test FE Connection to JDBC Failed: Can not
connect to jdbc due to error: Code: 516. DB::Exception: default: Authentication
failed: password is incorrect, or there is no user with such name."
+ }
+
}finally {
res_dbs_log = sql "show databases;"
for(int i = 0;i < res_dbs_log.size();i++) {
@@ -108,7 +134,9 @@ suite("test_clickhouse_jdbc_catalog",
"p0,external,clickhouse,external_docker,ex
log.info( "database = ${res_dbs_log[i][0]} =>
tables = "+tbs.toString())
}
}
-
+
sql """ drop catalog if exists ${catalog_name} """
+ sql """ drop catalog if exists clickhouse_catalog_test_conn_correct """
+ sql """ drop catalog if exists clickhouse_catalog_test_conn_mistake """
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]