This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 546006564 [INLONG-7135][Manager] Support the connection test of data
nodes and clusters (#7136)
546006564 is described below
commit 5460065649420aebfb8086dc30a1f715541cb632
Author: fuweng11 <[email protected]>
AuthorDate: Tue Jan 3 20:20:36 2023 +0800
[INLONG-7135][Manager] Support the connection test of data nodes and
clusters (#7136)
---
.../service/cluster/AbstractClusterOperator.java | 8 ++++-
.../service/cluster/InlongClusterOperator.java | 10 ++++++-
.../service/cluster/InlongClusterService.java | 8 +++++
.../service/cluster/InlongClusterServiceImpl.java | 14 ++++++++-
.../service/cluster/PulsarClusterOperator.java | 22 ++++++++++++--
.../service/node/AbstractDataNodeOperator.java | 6 ++++
.../manager/service/node/DataNodeOperator.java | 8 +++++
.../manager/service/node/DataNodeServiceImpl.java | 28 ++----------------
.../node/es/ElasticsearchDataNodeOperator.java | 34 ++++++++++++++++++++++
.../service/node/hive/HiveDataNodeOperator.java | 22 ++++++++++++++
.../service/resource/sink/es/ElasticsearchApi.java | 2 +-
.../manager/web/controller/DataNodeController.java | 2 +-
.../web/controller/InlongClusterController.java | 6 ++++
13 files changed, 138 insertions(+), 32 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
index ace15bf67..d788c505e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/AbstractClusterOperator.java
@@ -20,10 +20,10 @@ package org.apache.inlong.manager.service.cluster;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -77,4 +77,10 @@ public abstract class AbstractClusterOperator implements
InlongClusterOperator {
}
}
+ @Override
+ public Boolean testConnection(ClusterRequest request) {
+ throw new BusinessException(
+
String.format(ErrorCodeEnum.CLUSTER_TYPE_NOT_SUPPORTED.getMessage(),
request.getType()));
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
index dadd500ea..912279ea1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterOperator.java
@@ -17,9 +17,9 @@
package org.apache.inlong.manager.service.cluster;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
/**
* Interface of the inlong cluster operator.
@@ -63,4 +63,12 @@ public interface InlongClusterOperator {
*/
void updateOpt(ClusterRequest request, String operator);
+ /**
+ * Test connection
+ *
+ * @param request request of the cluster
+ * @return Whether the connection is successful
+ */
+ Boolean testConnection(ClusterRequest request);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 289a77c56..8ecccbc8b 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -429,4 +429,12 @@ public interface InlongClusterService {
*/
String getAllConfig(String clusterName, String md5);
+ /**
+ * Test whether the connection can be successfully established.
+ *
+ * @param request connection request
+ * @return true or false
+ */
+ Boolean testConnection(ClusterRequest request);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index a32e1d6f5..a76ec6dcd 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -25,6 +25,7 @@ import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig;
import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse;
@@ -33,7 +34,6 @@ import
org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
@@ -1649,6 +1649,18 @@ public class InlongClusterServiceImpl implements
InlongClusterService {
return configJson;
}
+ @Override
+ public Boolean testConnection(ClusterRequest request) {
+ LOGGER.info("begin test connection for: {}", request);
+ String type = request.getType();
+
+ // according to the data node type, test connection
+ InlongClusterOperator clusterOperator =
clusterOperatorFactory.getInstance(request.getType());
+ Boolean result = clusterOperator.testConnection(request);
+ LOGGER.info("connection [{}] for: {}", result ? "success" : "failed",
request);
+ return result;
+ }
+
/**
* Remove cluster tag from the given cluster entity.
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index 3f7770d40..a4c98bb11 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -22,13 +22,15 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -86,4 +88,20 @@ public class PulsarClusterOperator extends
AbstractClusterOperator {
}
}
+ @Override
+ public Boolean testConnection(ClusterRequest request) {
+ PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request;
+ PulsarClusterInfo pulsarInfo = new PulsarClusterInfo();
+ CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo);
+ try (PulsarAdmin ignored = PulsarUtils.getPulsarAdmin(pulsarInfo)) {
+ LOGGER.info("pulsar connection not null - connection success for
adminUrl={}", pulsarInfo.getAdminUrl());
+ return true;
+ } catch (Exception e) {
+ String errMsg = String.format("pulsar connection failed for
adminUrl=%s, password=%s",
+ pulsarInfo.getAdminUrl(), pulsarInfo.getToken());
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
index 51eeb91f6..91fae43a8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -85,4 +85,10 @@ public abstract class AbstractDataNodeOperator implements
DataNodeOperator {
public Map<String, String> parse2SinkParams(DataNodeInfo info) {
return JsonUtils.parseObject(info.getExtParams(), HashMap.class);
}
+
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ throw new BusinessException(
+
String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(),
request.getType()));
+ }
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
index aeae290b7..14c8d35a0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -71,4 +71,12 @@ public interface DataNodeOperator {
* @return Sink params
*/
Map<String, String> parse2SinkParams(DataNodeInfo info);
+
+ /**
+ * Test connection
+ * @param request request of the data node
+ * @return Whether the connection is successful
+ */
+ Boolean testConnection(DataNodeRequest request);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index 13b613c77..60935ed54 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -19,13 +19,10 @@ package org.apache.inlong.manager.service.node;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
-
-import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -34,14 +31,12 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.apache.inlong.manager.pojo.user.UserInfo;
-import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
-import java.sql.Connection;
import java.util.List;
import java.util.stream.Collectors;
@@ -341,28 +336,11 @@ public class DataNodeServiceImpl implements
DataNodeService {
LOGGER.info("begin test connection for: {}", request);
String type = request.getType();
- Boolean result = false;
- if (DataNodeType.HIVE.equals(type)) {
- result = testHiveConnection(request);
- }
-
+ // according to the data node type, test connection
+ DataNodeOperator dataNodeOperator =
operatorFactory.getInstance(request.getType());
+ Boolean result = dataNodeOperator.testConnection(request);
LOGGER.info("connection [{}] for: {}", result ? "success" : "failed",
request);
return result;
}
- /**
- * Test connection for Hive
- */
- private Boolean testHiveConnection(DataNodeRequest request) {
- String url = request.getUrl();
- Preconditions.checkNotNull(url, "connection url cannot be empty");
- try (Connection ignored = HiveJdbcUtils.getConnection(url,
request.getUsername(), request.getToken())) {
- LOGGER.info("hive connection not null - connection success");
- return true;
- } catch (Exception e) {
- LOGGER.error("hive connection failed: {}", e.getMessage());
- return false;
- }
- }
-
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
index 699b83a01..e07338f09 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/es/ElasticsearchDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,6 +31,9 @@ import
org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeDTO;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeInfo;
import org.apache.inlong.manager.pojo.node.es.ElasticsearchDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchApi;
+import org.apache.inlong.manager.service.resource.sink.es.ElasticsearchConfig;
+import org.elasticsearch.client.RequestOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -82,4 +86,34 @@ public class ElasticsearchDataNodeOperator extends
AbstractDataNodeOperator {
LOGGER.debug("success to get elasticsearch data node from entity");
return info;
}
+
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ String url = request.getUrl();
+ String username = request.getUsername();
+ String password = request.getToken();
+ Preconditions.checkNotNull(url, "connection url cannot be empty");
+ ElasticsearchApi client = new ElasticsearchApi();
+ ElasticsearchConfig config = new ElasticsearchConfig();
+ if (StringUtils.isNotEmpty(request.getUsername())) {
+ config.setAuthEnable(true);
+ config.setUsername(username);
+ config.setPassword(password);
+ }
+ config.setHosts(url);
+ client.setEsConfig(config);
+ boolean result;
+ try {
+ result = client.getEsClient().ping(RequestOptions.DEFAULT);
+ LOGGER.info("elasticsearch connection is {} for url={},
username={}, password={}", result, url, username,
+ password);
+ return result;
+ } catch (Exception e) {
+ String errMsg = String.format("elasticsearch connection failed for
url=%s, username=%s, password=%s", url,
+ username, password);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
index 7cb25bd5f..6adbe0b1a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
@@ -30,11 +31,14 @@ import
org.apache.inlong.manager.pojo.node.hive.HiveDataNodeDTO;
import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.sql.Connection;
+
@Service
public class HiveDataNodeOperator extends AbstractDataNodeOperator {
@@ -84,4 +88,22 @@ public class HiveDataNodeOperator extends
AbstractDataNodeOperator {
}
}
+ @Override
+ public Boolean testConnection(DataNodeRequest request) {
+ String url = request.getUrl();
+ String username = request.getUsername();
+ String password = request.getToken();
+ Preconditions.checkNotNull(url, "connection url cannot be empty");
+ try (Connection ignored = HiveJdbcUtils.getConnection(url, username,
password)) {
+ LOGGER.info("hive connection not null - connection success for
url={}, username={}, password={}", url,
+ username, password);
+ return true;
+ } catch (Exception e) {
+ String errMsg = String.format("hive connection failed for url=%s,
username=%s, password=%s", url,
+ username, password);
+ LOGGER.error(errMsg, e);
+ throw new BusinessException(errMsg);
+ }
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
index 41309e6de..43b187ec2 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/es/ElasticsearchApi.java
@@ -227,7 +227,7 @@ public class ElasticsearchApi {
*
* @return RestHighLevelClient
*/
- private RestHighLevelClient getEsClient() {
+ public RestHighLevelClient getEsClient() {
return esConfig.highLevelClient();
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
index 641abc9b9..33ddba8c8 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
@@ -118,7 +118,7 @@ public class DataNodeController {
@PostMapping("/node/testConnection")
@ApiOperation(value = "Test connection for data node")
- public Response<Boolean> testConnection(@Validated @RequestBody
DataNodeRequest request) {
+ public Response<Boolean> testConnection(@RequestBody DataNodeRequest
request) {
return Response.success(dataNodeService.testConnection(request));
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
index af822e703..3ea4634d3 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java
@@ -235,4 +235,10 @@ public class InlongClusterController {
String username = LoginUserUtils.getLoginUser().getName();
return Response.success(clusterService.bindNodeTag(request, username));
}
+
+ @PostMapping("/cluster/testConnection")
+ @ApiOperation(value = "Test connection for inlong cluster")
+ public Response<Boolean> testConnection(@RequestBody ClusterRequest
request) {
+ return Response.success(clusterService.testConnection(request));
+ }
}