This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch java_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 79ee37a388d26645953847a670cc3f7369044621 Author: Aaron Ai <[email protected]> AuthorDate: Sun Jul 31 15:53:59 2022 +0800 WIP --- .../main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java | 4 ++-- .../org/apache/rocketmq/client/java/impl/ClientSessionImpl.java | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index 2ee5e23..4adf064 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -299,7 +299,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, @Override public boolean isEndpointsDeprecated(Endpoints endpoints) { final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints(); - return totalRouteEndpoints.contains(endpoints); + return !totalRouteEndpoints.contains(endpoints); } @Override @@ -415,7 +415,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, } } - public ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) { + private ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) { final ClientSessionImpl clientSession = getClientSession(endpoints); return clientSession.syncSettingsSafely(); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java index 90402eb..e2fe848 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java @@ -44,7 +44,11 @@ public class ClientSessionImpl implements StreamObserver<TelemetryCommand> { protected ClientSessionImpl(ClientSessionHandler handler, Endpoints endpoints) { this.handler = handler; this.endpoints = endpoints; - renewRequestObserver(); + try { + this.requestObserver = handler.telemetry(endpoints, this); + } catch (Throwable t) { + handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS); + } } private void renewRequestObserver() {
