This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e966b44f5 [INLONG-7256][Manager] Add a check for Pulsar service URL 
(#7316)
e966b44f5 is described below

commit e966b44f51aaf14690611c91c298f9c3dec4d1fa
Author: fuweng11 <[email protected]>
AuthorDate: Fri Feb 3 17:08:13 2023 +0800

    [INLONG-7256][Manager] Add a check for Pulsar service URL (#7316)
    
    Co-authored-by: healchow <[email protected]>
---
 .../service/cluster/PulsarClusterOperator.java     | 36 +++++++++++++++++++---
 1 file changed, 32 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index 1e99a5f34..5dea11adb 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -19,10 +19,12 @@ package org.apache.inlong.manager.service.cluster;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
@@ -36,6 +38,10 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+
 /**
  * Pulsar cluster operator.
  */
@@ -44,6 +50,8 @@ public class PulsarClusterOperator extends 
AbstractClusterOperator {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarClusterOperator.class);
 
+    private static final String SERVICE_URL_PREFIX = "pulsar://";
+
     @Autowired
     private ObjectMapper objectMapper;
 
@@ -92,12 +100,32 @@ public class PulsarClusterOperator extends 
AbstractClusterOperator {
         PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request;
         PulsarClusterInfo pulsarInfo = new PulsarClusterInfo();
         CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo);
-        try (PulsarAdmin ignored = PulsarUtils.getPulsarAdmin(pulsarInfo)) {
-            LOGGER.debug("pulsar connection not null - connection success for 
adminUrl={}", pulsarInfo.getAdminUrl());
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo)) 
{
+            // test connect for pulsar adminUrl
+            pulsarAdmin.tenants().getTenants();
+            // test connect for pulsar ServiceUrl
+            final String serviceUrl = pulsarInfo.getUrl();
+            Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is 
empty");
+            Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
+                    String.format("Pulsar ServiceUrl=%s should starts with 
%s", serviceUrl, SERVICE_URL_PREFIX));
+
+            String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
+            String[] hostPortArr = hostPortStr.split(InlongConstants.COLON);
+            Preconditions.expectTrue(hostPortArr.length >= 2,
+                    String.format("Pulsar ServiceUrl=%s should has ip and 
port, such as '127.0.0.1:6650'", serviceUrl));
+
+            String host = hostPortArr[0];
+            int port = Integer.parseInt(hostPortArr[1]);
+            SocketAddress socketAddress = new InetSocketAddress(host, port);
+            Socket socket = new Socket();
+            socket.connect(socketAddress, 30000);
+            socket.close();
+            LOGGER.debug("Pulsar connection not null - connection success for 
AdminUrl={}, ServiceUrl={}",
+                    pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
             return true;
         } catch (Exception e) {
-            String errMsg = String.format("pulsar connection failed for 
adminUrl=%s, password=%s",
-                    pulsarInfo.getAdminUrl(), pulsarInfo.getToken());
+            String errMsg = String.format("Pulsar connection failed for 
AdminUrl=%s, ServiceUrl=%s",
+                    pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
             LOGGER.error(errMsg, e);
             throw new BusinessException(errMsg);
         }

Reply via email to