AuroraTwinkle commented on code in PR #22935:
URL: https://github.com/apache/pulsar/pull/22935#discussion_r1661011169
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java:
##########
@@ -97,14 +117,71 @@ public void updateServiceUrl(String serviceUrl) throws
InvalidServiceURL {
}
}
this.addressList = addresses;
+ this.healthyAddress = addresses;
this.serviceUrl = serviceUrl;
this.serviceUri = uri;
this.currentIndex = randomIndex(addresses.size());
+
+ if (!healthCheckScheduled.get()) {
+ synchronized (this) {
+ if (!healthCheckScheduled.get()) {
+ ScheduledFuture<?> future =
+ ((ScheduledExecutorService)
executorProvider.getExecutor()).scheduleWithFixedDelay(
+ this::doHealthCheck, 0, 5,
TimeUnit.SECONDS);
+ healthCheckScheduled.set(true);
+ healthCheckFuture.set(future);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ ScheduledFuture<?> future = healthCheckFuture.get();
+ if (future != null) {
+ future.cancel(true);
+ healthCheckScheduled.set(false);
+ healthCheckFuture.set(null);
+ }
+ }
+
+ /**
+ * Health check for all addresses. This function will be called of same
thread,
+ * so it is safe to update the healthy address list.
+ */
+ private void doHealthCheck() {
+ List<InetSocketAddress> list = addressList;
+ if (list != null && !list.isEmpty()) {
+ List<InetSocketAddress> healthy = new ArrayList<>(list.size());
+ for (InetSocketAddress address : list) {
+ for (int i = 0; i < 3; i++){
+ if (checkAddress(address)) {
+ healthy.add(address);
+ break;
+ } else {
+ Uninterruptibles.sleepUninterruptibly(1,
TimeUnit.SECONDS);
+ }
+ }
+ }
+ healthyAddress = healthy;
+ }
}
private static int randomIndex(int numAddresses) {
return numAddresses == 1
?
0 :
io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses);
}
+
+ private static boolean checkAddress(InetSocketAddress address) {
+ try (Socket socket = new Socket()) {
+ socket.connect(address, 5000);
Review Comment:
try with resource will invoke close method automatically.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java:
##########
@@ -97,14 +117,71 @@ public void updateServiceUrl(String serviceUrl) throws
InvalidServiceURL {
}
}
this.addressList = addresses;
+ this.healthyAddress = addresses;
this.serviceUrl = serviceUrl;
this.serviceUri = uri;
this.currentIndex = randomIndex(addresses.size());
+
+ if (!healthCheckScheduled.get()) {
+ synchronized (this) {
+ if (!healthCheckScheduled.get()) {
+ ScheduledFuture<?> future =
+ ((ScheduledExecutorService)
executorProvider.getExecutor()).scheduleWithFixedDelay(
+ this::doHealthCheck, 0, 5,
TimeUnit.SECONDS);
+ healthCheckScheduled.set(true);
+ healthCheckFuture.set(future);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ ScheduledFuture<?> future = healthCheckFuture.get();
+ if (future != null) {
+ future.cancel(true);
+ healthCheckScheduled.set(false);
+ healthCheckFuture.set(null);
+ }
+ }
+
+ /**
+ * Health check for all addresses. This function will be called of same
thread,
+ * so it is safe to update the healthy address list.
+ */
+ private void doHealthCheck() {
+ List<InetSocketAddress> list = addressList;
+ if (list != null && !list.isEmpty()) {
+ List<InetSocketAddress> healthy = new ArrayList<>(list.size());
+ for (InetSocketAddress address : list) {
+ for (int i = 0; i < 3; i++){
+ if (checkAddress(address)) {
+ healthy.add(address);
+ break;
+ } else {
+ Uninterruptibles.sleepUninterruptibly(1,
TimeUnit.SECONDS);
+ }
+ }
+ }
+ healthyAddress = healthy;
+ }
}
private static int randomIndex(int numAddresses) {
return numAddresses == 1
?
0 :
io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses);
}
+
+ private static boolean checkAddress(InetSocketAddress address) {
+ try (Socket socket = new Socket()) {
+ socket.connect(address, 5000);
Review Comment:
try with resource will invoke close method automatically.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarServiceNameResolver.java:
##########
@@ -97,14 +117,71 @@ public void updateServiceUrl(String serviceUrl) throws
InvalidServiceURL {
}
}
this.addressList = addresses;
+ this.healthyAddress = addresses;
this.serviceUrl = serviceUrl;
this.serviceUri = uri;
this.currentIndex = randomIndex(addresses.size());
+
+ if (!healthCheckScheduled.get()) {
+ synchronized (this) {
+ if (!healthCheckScheduled.get()) {
+ ScheduledFuture<?> future =
+ ((ScheduledExecutorService)
executorProvider.getExecutor()).scheduleWithFixedDelay(
+ this::doHealthCheck, 0, 5,
TimeUnit.SECONDS);
+ healthCheckScheduled.set(true);
+ healthCheckFuture.set(future);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ ScheduledFuture<?> future = healthCheckFuture.get();
+ if (future != null) {
+ future.cancel(true);
+ healthCheckScheduled.set(false);
+ healthCheckFuture.set(null);
+ }
+ }
+
+ /**
+ * Health check for all addresses. This function will be called of same
thread,
+ * so it is safe to update the healthy address list.
+ */
+ private void doHealthCheck() {
+ List<InetSocketAddress> list = addressList;
+ if (list != null && !list.isEmpty()) {
+ List<InetSocketAddress> healthy = new ArrayList<>(list.size());
+ for (InetSocketAddress address : list) {
+ for (int i = 0; i < 3; i++){
+ if (checkAddress(address)) {
+ healthy.add(address);
+ break;
+ } else {
+ Uninterruptibles.sleepUninterruptibly(1,
TimeUnit.SECONDS);
+ }
+ }
+ }
+ healthyAddress = healthy;
+ }
}
private static int randomIndex(int numAddresses) {
return numAddresses == 1
?
0 :
io.netty.util.internal.PlatformDependent.threadLocalRandom().nextInt(numAddresses);
}
+
+ private static boolean checkAddress(InetSocketAddress address) {
+ try (Socket socket = new Socket()) {
+ socket.connect(address, 5000);
Review Comment:
> The socket here needs to be closed
Try-with-resource will invoke close method automatically.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]