This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e966b44f5 [INLONG-7256][Manager] Add a check for Pulsar service URL
(#7316)
e966b44f5 is described below
commit e966b44f51aaf14690611c91c298f9c3dec4d1fa
Author: fuweng11 <[email protected]>
AuthorDate: Fri Feb 3 17:08:13 2023 +0800
[INLONG-7256][Manager] Add a check for Pulsar service URL (#7316)
Co-authored-by: healchow <[email protected]>
---
.../service/cluster/PulsarClusterOperator.java | 36 +++++++++++++++++++---
1 file changed, 32 insertions(+), 4 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index 1e99a5f34..5dea11adb 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.service.cluster;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
@@ -36,6 +38,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+
/**
* Pulsar cluster operator.
*/
@@ -44,6 +50,8 @@ public class PulsarClusterOperator extends
AbstractClusterOperator {
private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarClusterOperator.class);
+ private static final String SERVICE_URL_PREFIX = "pulsar://";
+
@Autowired
private ObjectMapper objectMapper;
@@ -92,12 +100,32 @@ public class PulsarClusterOperator extends
AbstractClusterOperator {
PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request;
PulsarClusterInfo pulsarInfo = new PulsarClusterInfo();
CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo);
- try (PulsarAdmin ignored = PulsarUtils.getPulsarAdmin(pulsarInfo)) {
- LOGGER.debug("pulsar connection not null - connection success for
adminUrl={}", pulsarInfo.getAdminUrl());
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo))
{
+ // test connect for pulsar adminUrl
+ pulsarAdmin.tenants().getTenants();
+ // test connect for pulsar ServiceUrl
+ final String serviceUrl = pulsarInfo.getUrl();
+ Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is
empty");
+ Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
+ String.format("Pulsar ServiceUrl=%s should starts with
%s", serviceUrl, SERVICE_URL_PREFIX));
+
+ String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
+ String[] hostPortArr = hostPortStr.split(InlongConstants.COLON);
+ Preconditions.expectTrue(hostPortArr.length >= 2,
+ String.format("Pulsar ServiceUrl=%s should has ip and
port, such as '127.0.0.1:6650'", serviceUrl));
+
+ String host = hostPortArr[0];
+ int port = Integer.parseInt(hostPortArr[1]);
+ SocketAddress socketAddress = new InetSocketAddress(host, port);
+ Socket socket = new Socket();
+ socket.connect(socketAddress, 30000);
+ socket.close();
+ LOGGER.debug("Pulsar connection not null - connection success for
AdminUrl={}, ServiceUrl={}",
+ pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
return true;
} catch (Exception e) {
- String errMsg = String.format("pulsar connection failed for
adminUrl=%s, password=%s",
- pulsarInfo.getAdminUrl(), pulsarInfo.getToken());
+ String errMsg = String.format("Pulsar connection failed for
AdminUrl=%s, ServiceUrl=%s",
+ pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
}