sijie commented on a change in pull request #4018: Support multi-host for
pulsar-admin
URL: https://github.com/apache/pulsar/pull/4018#discussion_r279009964
##########
File path:
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
##########
@@ -84,63 +90,106 @@ public boolean keepAlive(Request ahcRequest, HttpRequest
request, HttpResponse r
}
});
- if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())
- && conf.getServiceUrl().startsWith("https://")) {
-
- SslContext sslCtx = null;
-
- // Set client key and certificate if available
- AuthenticationDataProvider authData =
conf.getAuthentication().getAuthData();
- if (authData.hasDataForTls()) {
- sslCtx = SecurityUtility.createNettySslContextForClient(
- conf.isTlsAllowInsecureConnection() ||
!conf.isTlsHostnameVerificationEnable(),
- conf.getTlsTrustCertsFilePath(),
- authData.getTlsCertificates(),
authData.getTlsPrivateKey());
- } else {
- sslCtx = SecurityUtility.createNettySslContextForClient(
- conf.isTlsAllowInsecureConnection() ||
!conf.isTlsHostnameVerificationEnable(),
- conf.getTlsTrustCertsFilePath());
- }
+ serviceNameResolver = new PulsarServiceNameResolver();
+ if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())) {
+ serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
+ if (conf.getServiceUrl().startsWith("https://")) {
+
+ SslContext sslCtx = null;
+
+ // Set client key and certificate if available
+ AuthenticationDataProvider authData =
conf.getAuthentication().getAuthData();
+ if (authData.hasDataForTls()) {
+ sslCtx =
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection()
|| !conf.isTlsHostnameVerificationEnable(),
+
conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(),
authData.getTlsPrivateKey());
+ } else {
+ sslCtx =
SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection()
|| !conf.isTlsHostnameVerificationEnable(),
+
conf.getTlsTrustCertsFilePath());
+ }
- confBuilder.setSslContext(sslCtx);
+ confBuilder.setSslContext(sslCtx);
+ }
}
httpClient = new DefaultAsyncHttpClient(confBuilder.build());
}
- @Override
public ClientResponse apply(ClientRequest jerseyRequest) {
- CompletableFuture<ClientResponse> future = new CompletableFuture<>();
- try {
- Future<?> resultFuture = apply(jerseyRequest, new
AsyncConnectorCallback() {
- @Override
- public void response(ClientResponse response) {
- future.complete(response);
+ CompletableFuture<ClientResponse> future = new CompletableFuture<>();
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ InetSocketAddress address = serviceNameResolver.resolveHost();
+ URI requestUri = replaceWithNew(address, jerseyRequest.getUri());
+ jerseyRequest.setUri(requestUri);
+ CompletableFuture<ClientResponse> tempFuture = new
CompletableFuture<>();
+ try {
+ boolean requestSuccess = resolveRequest(tempFuture,
jerseyRequest);
+ if (requestSuccess) {
+ future = tempFuture;
+ break;
}
-
- @Override
- public void failure(Throwable failure) {
- future.completeExceptionally(failure);
+ if (System.currentTimeMillis() - startTime >
httpClient.getConfig().getRequestTimeout()) {
+ throw new ProcessingException(
+ "Request timeout, the last try service url is : " +
jerseyRequest.getUri().toString());
+ }
+ } catch (ExecutionException ex) {
+ if (System.currentTimeMillis() - startTime >
httpClient.getConfig().getRequestTimeout()) {
+ Throwable e = ex.getCause() == null ? ex : ex.getCause();
+ throw new ProcessingException((e.getMessage()), e);
}
- });
+ } catch (Exception e) {
+ if (System.currentTimeMillis() - startTime >
httpClient.getConfig().getRequestTimeout()) {
+ throw new ProcessingException(e.getMessage(), e);
+ }
+ }
+ }
+
+ return future.join();
+ }
+
+ private URI replaceWithNew(InetSocketAddress address, URI uri) {
+ String originalUri = uri.toString();
+ String newUri = (originalUri.split(":")[0] + "://")
+ + address.getHostName() + ":"
+ + address.getPort()
+ + uri.getRawPath();
+ if (uri.getRawQuery() != null) {
+ newUri += "?" + uri.getRawQuery();
+ }
+ return URI.create(newUri);
+ }
- Integer timeout = ClientProperties.getValue(
- jerseyRequest.getConfiguration().getProperties(),
- ClientProperties.READ_TIMEOUT, 0);
- if (timeout != null && timeout > 0) {
- resultFuture.get(timeout, TimeUnit.MILLISECONDS);
- } else {
- resultFuture.get();
+
+ private boolean resolveRequest(CompletableFuture<ClientResponse> future,
+ ClientRequest jerseyRequest)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ Future<?> resultFuture = apply(jerseyRequest, new
AsyncConnectorCallback() {
+ @Override
+ public void response(ClientResponse response) {
+ future.complete(response);
}
- } catch (ExecutionException ex) {
- Throwable e = ex.getCause() == null ? ex : ex.getCause();
- throw new ProcessingException(e.getMessage(), e);
- } catch (Exception ex) {
- throw new ProcessingException(ex.getMessage(), ex);
+ @Override
+ public void failure(Throwable failure) {
+ future.completeExceptionally(failure);
+ }
+ });
+
+ Integer timeout = ClientProperties.getValue(
+ jerseyRequest.getConfiguration().getProperties(),
+ ClientProperties.READ_TIMEOUT, 0) / 3;
+
+ Object result = null;
+ if (timeout != null && timeout > 0) {
+ result = resultFuture.get(timeout, TimeUnit.MILLISECONDS);
+ } else {
+ result = resultFuture.get();
}
- return future.join();
+ if (result != null && result instanceof Throwable) {
Review comment:
Can you just throw the exception from this method? So that in `apply()`
method, you don't need to handle the return result since you already have
caught exception and handle the retry logic.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services