This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 16b458119 [INLONG-7337][Manager] Fix test Pulsar connection error 
(#7348)
16b458119 is described below

commit 16b45811949f2569a43c45664b486db20c457b7a
Author: leosanqing <[email protected]>
AuthorDate: Wed Feb 15 10:58:44 2023 +0800

    [INLONG-7337][Manager] Fix test Pulsar connection error (#7348)
---
 .../service/cluster/PulsarClusterOperator.java     | 74 ++++++++++++++++------
 1 file changed, 53 insertions(+), 21 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index 5dea11adb..3d8e0e14f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -38,9 +38,11 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.util.Arrays;
 
 /**
  * Pulsar cluster operator.
@@ -100,35 +102,65 @@ public class PulsarClusterOperator extends 
AbstractClusterOperator {
         PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request;
         PulsarClusterInfo pulsarInfo = new PulsarClusterInfo();
         CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo);
+
+        testConnectServiceUrl(pulsarInfo.getUrl());
+        return testConnectAdminUrl(pulsarInfo);
+    }
+
+    /**
+     * Test connect for Pulsar AdminUrl
+     *
+     * @param pulsarInfo pulsar cluster info
+     * @return
+     */
+    private Boolean testConnectAdminUrl(PulsarClusterInfo pulsarInfo) {
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo)) 
{
             // test connect for pulsar adminUrl
             pulsarAdmin.tenants().getTenants();
-            // test connect for pulsar ServiceUrl
-            final String serviceUrl = pulsarInfo.getUrl();
-            Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is 
empty");
-            Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
-                    String.format("Pulsar ServiceUrl=%s should starts with 
%s", serviceUrl, SERVICE_URL_PREFIX));
-
-            String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
-            String[] hostPortArr = hostPortStr.split(InlongConstants.COLON);
-            Preconditions.expectTrue(hostPortArr.length >= 2,
-                    String.format("Pulsar ServiceUrl=%s should has ip and 
port, such as '127.0.0.1:6650'", serviceUrl));
-
-            String host = hostPortArr[0];
-            int port = Integer.parseInt(hostPortArr[1]);
-            SocketAddress socketAddress = new InetSocketAddress(host, port);
-            Socket socket = new Socket();
-            socket.connect(socketAddress, 30000);
-            socket.close();
-            LOGGER.debug("Pulsar connection not null - connection success for 
AdminUrl={}, ServiceUrl={}",
-                    pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
             return true;
         } catch (Exception e) {
-            String errMsg = String.format("Pulsar connection failed for 
AdminUrl=%s, ServiceUrl=%s",
-                    pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
+            String errMsg = String.format("Pulsar connection failed for 
AdminUrl=%s", pulsarInfo.getAdminUrl());
             LOGGER.error(errMsg, e);
             throw new BusinessException(errMsg);
         }
     }
 
+    /**
+     * Test connect for Pulsar ServiceUrl
+     *
+     * @param serviceUrl Pulsar serviceUrl, such as: 
pulsar://127.0.0.1:6650,127.0.0.2:6650
+     */
+    private void testConnectServiceUrl(String serviceUrl) {
+        // test connect for Pulsar ServiceUrl
+        Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is empty");
+        Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
+                String.format("Pulsar ServiceUrl=%s should starts with %s", 
serviceUrl, SERVICE_URL_PREFIX));
+
+        String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
+
+        boolean successConnect = 
Arrays.stream(hostPortStr.split(InlongConstants.COMMA))
+                // Only when all addresses are reachable, it's considered 
successful
+                .allMatch(hostPort -> {
+                    String[] hostPortArr = 
hostPort.split(InlongConstants.COLON);
+                    Preconditions.expectTrue(hostPortArr.length >= 2,
+                            String.format("Pulsar ServiceUrl=%s should has ip 
and port, such as '127.0.0.1:6650'",
+                                    serviceUrl));
+
+                    String host = hostPortArr[0];
+                    int port = Integer.parseInt(hostPortArr[1]);
+
+                    try (Socket socket = new Socket()) {
+                        SocketAddress socketAddress = new 
InetSocketAddress(host, port);
+                        socket.connect(socketAddress, 30000);
+                        return true;
+                    } catch (IOException e) {
+                        String errMsg = String.format("Pulsar connection 
failed for ServiceUrl=%s", hostPort);
+                        LOGGER.error(errMsg, e);
+                        return false;
+                    }
+                });
+
+        Preconditions.expectTrue(successConnect,
+                String.format("Pulsar connection failed for ServiceUrl=%s", 
hostPortStr));
+    }
 }

Reply via email to