This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 16b458119 [INLONG-7337][Manager] Fix test Pulsar connection error
(#7348)
16b458119 is described below
commit 16b45811949f2569a43c45664b486db20c457b7a
Author: leosanqing <[email protected]>
AuthorDate: Wed Feb 15 10:58:44 2023 +0800
[INLONG-7337][Manager] Fix test Pulsar connection error (#7348)
---
.../service/cluster/PulsarClusterOperator.java | 74 ++++++++++++++++------
1 file changed, 53 insertions(+), 21 deletions(-)
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index 5dea11adb..3d8e0e14f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -38,9 +38,11 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
+import java.util.Arrays;
/**
* Pulsar cluster operator.
@@ -100,35 +102,65 @@ public class PulsarClusterOperator extends
AbstractClusterOperator {
PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request;
PulsarClusterInfo pulsarInfo = new PulsarClusterInfo();
CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo);
+
+ testConnectServiceUrl(pulsarInfo.getUrl());
+ return testConnectAdminUrl(pulsarInfo);
+ }
+
+ /**
+ * Test connect for Pulsar AdminUrl
+ *
+ * @param pulsarInfo pulsar cluster info
+ * @return
+ */
+ private Boolean testConnectAdminUrl(PulsarClusterInfo pulsarInfo) {
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo))
{
// test connect for pulsar adminUrl
pulsarAdmin.tenants().getTenants();
- // test connect for pulsar ServiceUrl
- final String serviceUrl = pulsarInfo.getUrl();
- Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is
empty");
- Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
- String.format("Pulsar ServiceUrl=%s should starts with
%s", serviceUrl, SERVICE_URL_PREFIX));
-
- String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
- String[] hostPortArr = hostPortStr.split(InlongConstants.COLON);
- Preconditions.expectTrue(hostPortArr.length >= 2,
- String.format("Pulsar ServiceUrl=%s should has ip and
port, such as '127.0.0.1:6650'", serviceUrl));
-
- String host = hostPortArr[0];
- int port = Integer.parseInt(hostPortArr[1]);
- SocketAddress socketAddress = new InetSocketAddress(host, port);
- Socket socket = new Socket();
- socket.connect(socketAddress, 30000);
- socket.close();
- LOGGER.debug("Pulsar connection not null - connection success for
AdminUrl={}, ServiceUrl={}",
- pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
return true;
} catch (Exception e) {
- String errMsg = String.format("Pulsar connection failed for
AdminUrl=%s, ServiceUrl=%s",
- pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
+ String errMsg = String.format("Pulsar connection failed for
AdminUrl=%s", pulsarInfo.getAdminUrl());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
}
}
+ /**
+ * Test connect for Pulsar ServiceUrl
+ *
+ * @param serviceUrl Pulsar serviceUrl, such as:
pulsar://127.0.0.1:6650,127.0.0.2:6650
+ */
+ private void testConnectServiceUrl(String serviceUrl) {
+ // test connect for Pulsar ServiceUrl
+ Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is empty");
+ Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
+ String.format("Pulsar ServiceUrl=%s should starts with %s",
serviceUrl, SERVICE_URL_PREFIX));
+
+ String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
+
+ boolean successConnect =
Arrays.stream(hostPortStr.split(InlongConstants.COMMA))
+ // Only when all addresses are reachable, it's considered
successful
+ .allMatch(hostPort -> {
+ String[] hostPortArr =
hostPort.split(InlongConstants.COLON);
+ Preconditions.expectTrue(hostPortArr.length >= 2,
+ String.format("Pulsar ServiceUrl=%s should has ip
and port, such as '127.0.0.1:6650'",
+ serviceUrl));
+
+ String host = hostPortArr[0];
+ int port = Integer.parseInt(hostPortArr[1]);
+
+ try (Socket socket = new Socket()) {
+ SocketAddress socketAddress = new
InetSocketAddress(host, port);
+ socket.connect(socketAddress, 30000);
+ return true;
+ } catch (IOException e) {
+ String errMsg = String.format("Pulsar connection
failed for ServiceUrl=%s", hostPort);
+ LOGGER.error(errMsg, e);
+ return false;
+ }
+ });
+
+ Preconditions.expectTrue(successConnect,
+ String.format("Pulsar connection failed for ServiceUrl=%s",
hostPortStr));
+ }
}