healchow commented on code in PR #7348:
URL: https://github.com/apache/inlong/pull/7348#discussion_r1106592901
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java:
##########
@@ -100,35 +102,65 @@ public Boolean testConnection(ClusterRequest request) {
PulsarClusterRequest pulsarRequest = (PulsarClusterRequest) request;
PulsarClusterInfo pulsarInfo = new PulsarClusterInfo();
CommonBeanUtils.copyProperties(pulsarRequest, pulsarInfo);
+
+ testConnectServiceUrl(pulsarInfo.getUrl());
+ return testConnectAdminUrl(pulsarInfo);
+ }
+
+ /**
+ * Test connect for Pulsar AdminUrl
+ *
+ * @param pulsarInfo pulsar cluster info
+ * @return
+ */
+ private Boolean testConnectAdminUrl(PulsarClusterInfo pulsarInfo) {
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarInfo))
{
// test connect for pulsar adminUrl
pulsarAdmin.tenants().getTenants();
- // test connect for pulsar ServiceUrl
- final String serviceUrl = pulsarInfo.getUrl();
- Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is
empty");
- Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
- String.format("Pulsar ServiceUrl=%s should starts with
%s", serviceUrl, SERVICE_URL_PREFIX));
-
- String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
- String[] hostPortArr = hostPortStr.split(InlongConstants.COLON);
- Preconditions.expectTrue(hostPortArr.length >= 2,
- String.format("Pulsar ServiceUrl=%s should has ip and
port, such as '127.0.0.1:6650'", serviceUrl));
-
- String host = hostPortArr[0];
- int port = Integer.parseInt(hostPortArr[1]);
- SocketAddress socketAddress = new InetSocketAddress(host, port);
- Socket socket = new Socket();
- socket.connect(socketAddress, 30000);
- socket.close();
- LOGGER.debug("Pulsar connection not null - connection success for
AdminUrl={}, ServiceUrl={}",
- pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
return true;
} catch (Exception e) {
- String errMsg = String.format("Pulsar connection failed for
AdminUrl=%s, ServiceUrl=%s",
- pulsarInfo.getAdminUrl(), pulsarInfo.getUrl());
+ String errMsg = String.format("Pulsar connection failed for
AdminUrl=%s", pulsarInfo.getAdminUrl());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
}
}
+ /**
+ * Test connect for Pulsar ServiceUrl
+ *
+ * @param serviceUrl Pulsar serviceUrl, such as:
pulsar://127.0.0.1:6650,127.0.0.2:6650
+ */
+ private void testConnectServiceUrl(String serviceUrl) {
+ // test connect for Pulsar ServiceUrl
+ Preconditions.expectNotNull(serviceUrl, "Pulsar ServiceUrl is empty");
+ Preconditions.expectTrue(serviceUrl.startsWith(SERVICE_URL_PREFIX),
+ String.format("Pulsar ServiceUrl=%s should starts with %s",
serviceUrl, SERVICE_URL_PREFIX));
+
+ String hostPortStr = serviceUrl.replaceAll(SERVICE_URL_PREFIX, "");
+
+ boolean successConnect =
Arrays.stream(hostPortStr.split(InlongConstants.COMMA))
+ // Only when all addresses are reachable, it's considered
successful
+ .allMatch(hostPort -> {
+ String[] hostPortArr =
hostPort.split(InlongConstants.COLON);
+ Preconditions.expectTrue(hostPortArr.length >= 2,
+ String.format("Pulsar ServiceUrl=%s should has ip
and port, such as '127.0.0.1:6650'",
+ serviceUrl));
+
+ String host = hostPortArr[0];
+ int port = Integer.parseInt(hostPortArr[1]);
+
+ try (Socket socket = new Socket()) {
+ SocketAddress socketAddress = new
InetSocketAddress(host, port);
+ socket.connect(socketAddress, 30000);
Review Comment:
Should we need to close the socket connection after testing?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]