liangyepianzhou commented on code in PR #22935:
URL: https://github.com/apache/pulsar/pull/22935#discussion_r1667659172
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java:
##########
@@ -97,14 +120,87 @@ public void updateServiceUrl(String serviceUrl) throws
InvalidServiceURL {
}
}
this.addressList = addresses;
+ this.healthyAddress = addresses;
+ this.removedAddresses = new HashSet<>();
this.serviceUrl = serviceUrl;
this.serviceUri = uri;
this.currentIndex = randomIndex(addresses.size());
+
+ if (!healthCheckScheduled.get()) {
+ synchronized (this) {
+ if (!healthCheckScheduled.get()) {
+ ScheduledFuture<?> future =
+ ((ScheduledExecutorService)
executorProvider.getExecutor()).scheduleWithFixedDelay(
+ this::doHealthCheck, 0, 5,
TimeUnit.SECONDS);
+ healthCheckScheduled.set(true);
+ healthCheckFuture.set(future);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ ScheduledFuture<?> future = healthCheckFuture.get();
+ if (future != null) {
+ future.cancel(true);
+ healthCheckScheduled.set(false);
+ healthCheckFuture.set(null);
+ log.info("PulsarServiceNameResolver is closed, so cancel the
health check task.");
+ }
+ }
+
+ /**
+ * Health check for all addresses. This function will be called of same
thread,
+ * so it is safe to update the healthy address list.
+ */
+ private void doHealthCheck() {
+ List<InetSocketAddress> list = addressList;
+ if (list != null && !list.isEmpty()) {
+ List<InetSocketAddress> healthy = new ArrayList<>(list.size());
+ for (InetSocketAddress address : list) {
+ boolean healthyAddress = false;
+ for (int i = 0; i < 3; i++) {
Review Comment:
Use constants instead of magic numbers.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java:
##########
@@ -97,14 +120,87 @@ public void updateServiceUrl(String serviceUrl) throws
InvalidServiceURL {
}
}
this.addressList = addresses;
+ this.healthyAddress = addresses;
+ this.removedAddresses = new HashSet<>();
this.serviceUrl = serviceUrl;
this.serviceUri = uri;
this.currentIndex = randomIndex(addresses.size());
+
+ if (!healthCheckScheduled.get()) {
+ synchronized (this) {
+ if (!healthCheckScheduled.get()) {
+ ScheduledFuture<?> future =
+ ((ScheduledExecutorService)
executorProvider.getExecutor()).scheduleWithFixedDelay(
+ this::doHealthCheck, 0, 5,
TimeUnit.SECONDS);
+ healthCheckScheduled.set(true);
+ healthCheckFuture.set(future);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ ScheduledFuture<?> future = healthCheckFuture.get();
+ if (future != null) {
+ future.cancel(true);
+ healthCheckScheduled.set(false);
+ healthCheckFuture.set(null);
+ log.info("PulsarServiceNameResolver is closed, so cancel the
health check task.");
+ }
+ }
+
+ /**
+ * Health check for all addresses. This function will be called of same
thread,
+ * so it is safe to update the healthy address list.
+ */
+ private void doHealthCheck() {
+ List<InetSocketAddress> list = addressList;
+ if (list != null && !list.isEmpty()) {
+ List<InetSocketAddress> healthy = new ArrayList<>(list.size());
+ for (InetSocketAddress address : list) {
+ boolean healthyAddress = false;
+ for (int i = 0; i < 3; i++) {
Review Comment:
Also, I don't think there's a need to retry three times here. You can
achieve similar effects to sleep by configuring timeout.
```
socket.connect(new InetSocketAddress(address.getHostName(),
address.getPort()), 5000);
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java:
##########
@@ -97,14 +120,87 @@ public void updateServiceUrl(String serviceUrl) throws
InvalidServiceURL {
}
}
this.addressList = addresses;
+ this.healthyAddress = addresses;
+ this.removedAddresses = new HashSet<>();
this.serviceUrl = serviceUrl;
this.serviceUri = uri;
this.currentIndex = randomIndex(addresses.size());
+
+ if (!healthCheckScheduled.get()) {
+ synchronized (this) {
+ if (!healthCheckScheduled.get()) {
+ ScheduledFuture<?> future =
+ ((ScheduledExecutorService)
executorProvider.getExecutor()).scheduleWithFixedDelay(
+ this::doHealthCheck, 0, 5,
TimeUnit.SECONDS);
+ healthCheckScheduled.set(true);
+ healthCheckFuture.set(future);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ ScheduledFuture<?> future = healthCheckFuture.get();
+ if (future != null) {
+ future.cancel(true);
+ healthCheckScheduled.set(false);
+ healthCheckFuture.set(null);
+ log.info("PulsarServiceNameResolver is closed, so cancel the
health check task.");
+ }
+ }
+
+ /**
+ * Health check for all addresses. This function will be called of same
thread,
+ * so it is safe to update the healthy address list.
+ */
+ private void doHealthCheck() {
+ List<InetSocketAddress> list = addressList;
+ if (list != null && !list.isEmpty()) {
+ List<InetSocketAddress> healthy = new ArrayList<>(list.size());
+ for (InetSocketAddress address : list) {
+ boolean healthyAddress = false;
+ for (int i = 0; i < 3; i++) {
+ if (checkAddress(address)) {
+ healthyAddress = true;
+ break;
+ } else {
+ Uninterruptibles.sleepUninterruptibly(1,
TimeUnit.SECONDS);
+ }
+ }
+
+ if (healthyAddress) {
+ if (removedAddresses.contains(address)) {
Review Comment:
The `removedAddresses` seems useless.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]