eolivelli commented on a change in pull request #13069:
URL: https://github.com/apache/pulsar/pull/13069#discussion_r761809476
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
##########
@@ -622,22 +639,36 @@ private void searchForCandidateBroker(NamespaceBundle
bundle,
}
private boolean isBrokerActive(String candidateBroker) {
- List<String> brokers =
pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).join();
-
- for (String brokerHostPort : brokers) {
- if (candidateBroker.equals("http://" + brokerHostPort)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Broker {} found for SLA Monitoring Namespace",
brokerHostPort);
- }
- return true;
+ String candidateBrokerHostAndPort = parseHostAndPort(candidateBroker);
+ Set<String> availableBrokers = getAvailableBrokers();
+ if (availableBrokers.contains(candidateBrokerHostAndPort)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Broker {} ({}) is available for.", candidateBroker,
candidateBrokerHostAndPort);
}
+ return true;
+ } else {
+ LOG.warn("Broker {} ({}) couldn't be found in available brokers
{}",
+ candidateBroker, candidateBrokerHostAndPort,
+
availableBrokers.stream().collect(Collectors.joining(",")));
+ return false;
}
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Broker not found for SLA Monitoring Namespace {}",
- candidateBroker + ":" + config.getWebServicePort());
+ private String parseHostAndPort(String candidateBroker) {
Review comment:
what about moving this method to some utility class ?
also we should make this "static"
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
##########
@@ -93,30 +96,51 @@ public NamespaceBundleFactory(PulsarService pulsar,
HashFunction hashFunc) {
}
CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+ doLoadBundles(namespace, future, createBackoff(), System.nanoTime() +
maxRetryDuration.toNanos());
+ return future;
+ }
+
+ private void doLoadBundles(NamespaceName namespace,
CompletableFuture<NamespaceBundles> future,
+ Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result
-> {
-
if (result.isPresent()) {
try {
future.complete(readBundles(namespace,
result.get().getValue(),
result.get().getStat().getVersion()));
} catch (IOException e) {
- future.completeExceptionally(e);
+ handleLoadBundlesRetry(namespace, future, backoff,
retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from
global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
- future.completeExceptionally(ex);
+ handleLoadBundlesRetry(namespace, future, backoff,
retryDeadline, ex);
return null;
});
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
- return future;
+ }
+
+ private void handleLoadBundlesRetry(NamespaceName namespace,
+ CompletableFuture<NamespaceBundles>
future,
+ Backoff backoff, long retryDeadline,
Throwable e) {
+ if (e instanceof Error || System.nanoTime() > retryDeadline) {
Review comment:
why `Error` ? like OutOfMemoryError ?
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java
##########
@@ -93,30 +96,51 @@ public NamespaceBundleFactory(PulsarService pulsar,
HashFunction hashFunc) {
}
CompletableFuture<NamespaceBundles> future = new CompletableFuture<>();
+ doLoadBundles(namespace, future, createBackoff(), System.nanoTime() +
maxRetryDuration.toNanos());
+ return future;
+ }
+
+ private void doLoadBundles(NamespaceName namespace,
CompletableFuture<NamespaceBundles> future,
+ Backoff backoff, long retryDeadline) {
// Read the static bundle data from the policies
pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesWithVersion(namespace).thenAccept(result
-> {
-
if (result.isPresent()) {
try {
future.complete(readBundles(namespace,
result.get().getValue(),
result.get().getStat().getVersion()));
} catch (IOException e) {
- future.completeExceptionally(e);
+ handleLoadBundlesRetry(namespace, future, backoff,
retryDeadline, e);
}
} else {
// If no local policies defined for namespace, copy from
global config
copyToLocalPolicies(namespace)
.thenAccept(b -> future.complete(b))
.exceptionally(ex -> {
- future.completeExceptionally(ex);
+ handleLoadBundlesRetry(namespace, future, backoff,
retryDeadline, ex);
return null;
});
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
- return future;
+ }
+
+ private void handleLoadBundlesRetry(NamespaceName namespace,
+ CompletableFuture<NamespaceBundles>
future,
+ Backoff backoff, long retryDeadline,
Throwable e) {
+ if (e instanceof Error || System.nanoTime() > retryDeadline) {
+ future.completeExceptionally(e);
+ } else {
+ LOG.warn("Error loading bundle for {}. Retrying exception",
namespace, e);
+ long retryDelay = backoff.next();
+ pulsar.getExecutor().schedule(() ->
+ doLoadBundles(namespace, future, backoff, retryDeadline),
retryDelay, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private Backoff createBackoff() {
Review comment:
static ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]