healchow commented on code in PR #7348:
URL: https://github.com/apache/inlong/pull/7348#discussion_r1106592901


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java:
##########
@@ -100,35 +102,65 @@ public Boolean testConnection(ClusterRequest request) {
         PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request;
         PulsarClusterInfo pulsarInfo = new PulsarClusterInfo();
         CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo);
+
+        testConnectServiceUrl(pulsarInfo.getUrl());
+        return testConnectAdminUrl(pulsarInfo);
+    }
+
+    /**
+     * Test connect for Pulsar AdminUrl
+     *
+     * @param pulsarInfo pulsar cluster info
+     * @return
+     */
+    private Boolean testConnectAdminUrl(PulsarClusterInfo pulsarInfo) {
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo)) 
{
             // test connect for pulsar adminUrl
             pulsarAdmin.tenants().getTenants();
-            // test connect for pulsar ServiceUrl
-            final String serviceUrl = pulsarInfo.getUrl();
-            Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is 
empty");
-            Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
-                    String.format("Pulsar ServiceUrl=%s should starts with 
%s", serviceUrl, SERVICE_URL_PREFIX));
-
-            String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
-            String[] hostPortArr = hostPortStr.split(InlongConstants.COLON);
-            Preconditions.expectTrue(hostPortArr.length >= 2,
-                    String.format("Pulsar ServiceUrl=%s should has ip and 
port, such as '127.0.0.1:6650'", serviceUrl));
-
-            String host = hostPortArr[0];
-            int port = Integer.parseInt(hostPortArr[1]);
-            SocketAddress socketAddress = new InetSocketAddress(host, port);
-            Socket socket = new Socket();
-            socket.connect(socketAddress, 30000);
-            socket.close();
-            LOGGER.debug("Pulsar connection not null - connection success for 
AdminUrl={}, ServiceUrl={}",
-                    pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
             return true;
         } catch (Exception e) {
-            String errMsg = String.format("Pulsar connection failed for 
AdminUrl=%s, ServiceUrl=%s",
-                    pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
+            String errMsg = String.format("Pulsar connection failed for 
AdminUrl=%s", pulsarInfo.getAdminUrl());
             LOGGER.error(errMsg, e);
             throw new BusinessException(errMsg);
         }
     }
 
+    /**
+     * Test connect for Pulsar ServiceUrl
+     *
+     * @param serviceUrl Pulsar serviceUrl, such as: 
pulsar://127.0.0.1:6650,127.0.0.2:6650
+     */
+    private void testConnectServiceUrl(String serviceUrl) {
+        // test connect for Pulsar ServiceUrl
+        Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is empty");
+        Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
+                String.format("Pulsar ServiceUrl=%s should starts with %s", 
serviceUrl, SERVICE_URL_PREFIX));
+
+        String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
+
+        boolean successConnect = 
Arrays.stream(hostPortStr.split(InlongConstants.COMMA))
+                // Only when all addresses are reachable, it's considered 
successful
+                .allMatch(hostPort -> {
+                    String[] hostPortArr = 
hostPort.split(InlongConstants.COLON);
+                    Preconditions.expectTrue(hostPortArr.length >= 2,
+                            String.format("Pulsar ServiceUrl=%s should has ip 
and port, such as '127.0.0.1:6650'",
+                                    serviceUrl));
+
+                    String host = hostPortArr[0];
+                    int port = Integer.parseInt(hostPortArr[1]);
+
+                    try (Socket socket = new Socket()) {
+                        SocketAddress socketAddress = new 
InetSocketAddress(host, port);
+                        socket.connect(socketAddress, 30000);

Review Comment:
   Should we need to close the socket connection after testing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to