This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a0b796d  Bugfix: reverse result in 
ClientSessionHandler#isEndpointsDeprecated (#108)
a0b796d is described below

commit a0b796d1f9130a109d07b360552e515d1b2a429b
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 16:29:42 2022 +0800

    Bugfix: reverse result in ClientSessionHandler#isEndpointsDeprecated (#108)
---
 .../rocketmq/client/java/impl/ClientImpl.java      | 29 +++++++-----
 .../client/java/impl/ClientSessionImpl.java        | 54 +++++++++++++---------
 2 files changed, 51 insertions(+), 32 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 2ee5e23..da1ff6f 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -35,6 +35,7 @@ import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.ThreadStackTrace;
 import apache.rocketmq.v2.VerifyMessageCommand;
 import apache.rocketmq.v2.VerifyMessageResult;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -299,7 +300,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     @Override
     public boolean isEndpointsDeprecated(Endpoints endpoints) {
         final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
-        return totalRouteEndpoints.contains(endpoints);
+        return !totalRouteEndpoints.contains(endpoints);
     }
 
     @Override
@@ -374,8 +375,8 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     }
 
     public void telemetry(Endpoints endpoints, TelemetryCommand command) {
-        final ClientSessionImpl clientSession = getClientSession(endpoints);
         try {
+            final ClientSessionImpl clientSession = 
getClientSession(endpoints);
             clientSession.fireWrite(command);
         } catch (Throwable t) {
             LOGGER.error("Failed to fire write telemetry command, clientId={}, 
endpoints={}", clientId, endpoints, t);
@@ -391,7 +392,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         }
     }
 
-    public ClientSessionImpl getClientSession(Endpoints endpoints) {
+    public ClientSessionImpl getClientSession(Endpoints endpoints) throws 
ClientException {
         sessionsLock.readLock().lock();
         try {
             final ClientSessionImpl session = sessionsTable.get(endpoints);
@@ -415,9 +416,13 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         }
     }
 
-    public ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) {
-        final ClientSessionImpl clientSession = getClientSession(endpoints);
-        return clientSession.syncSettingsSafely();
+    private ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) {
+        try {
+            final ClientSessionImpl clientSession = 
getClientSession(endpoints);
+            return clientSession.syncSettingsSafely();
+        } catch (Throwable t) {
+            return Futures.immediateFailedFuture(t);
+        }
     }
 
     /**
@@ -426,13 +431,15 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
      * <p>Never thrown any exception.
      */
     public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String 
topic, TopicRouteData topicRouteData) {
-        final List<ListenableFuture<Void>> futures = topicRouteData
+        final Set<Endpoints> routeEndpoints = topicRouteData
             .getMessageQueues().stream()
             .map(mq -> mq.getBroker().getEndpoints())
-            .collect(Collectors.toSet())
-            .stream().map(this::syncSettingsSafely)
-            .collect(Collectors.toList());
-        // TODO: Record exception.
+            .collect(Collectors.toSet());
+        final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
+        final Set<Endpoints> newEndpoints = new 
HashSet<>(Sets.difference(routeEndpoints,
+            existRouteEndpoints));
+        final List<ListenableFuture<Void>> futures =
+            
newEndpoints.stream().map(this::syncSettingsSafely).collect(Collectors.toList());
         return Futures.whenAllSucceed(futures).callAsync(() -> {
             topicRouteCache.put(topic, topicRouteData);
             onTopicRouteDataUpdate0(topic, topicRouteData);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 90402eb..b699ab0 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -25,7 +25,9 @@ import apache.rocketmq.v2.VerifyMessageCommand;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.stub.StreamObserver;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
@@ -36,39 +38,49 @@ import org.slf4j.LoggerFactory;
  */
 public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientSessionImpl.class);
+    private static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY = 
Duration.ofSeconds(1);
 
-    private final ClientSessionHandler handler;
+    private final ClientSessionHandler sessionHandler;
     private final Endpoints endpoints;
     private volatile StreamObserver<TelemetryCommand> requestObserver;
 
-    protected ClientSessionImpl(ClientSessionHandler handler, Endpoints 
endpoints) {
-        this.handler = handler;
+    protected ClientSessionImpl(ClientSessionHandler sessionHandler, Endpoints 
endpoints) throws ClientException {
+        this.sessionHandler = sessionHandler;
         this.endpoints = endpoints;
-        renewRequestObserver();
+        this.requestObserver = sessionHandler.telemetry(endpoints, this);
     }
 
     private void renewRequestObserver() {
         try {
-            if (handler.isEndpointsDeprecated(endpoints)) {
+            if (sessionHandler.isEndpointsDeprecated(endpoints)) {
                 LOGGER.info("Endpoints is deprecated, no longer to renew 
requestObserver, endpoints={}", endpoints);
                 return;
             }
-            this.requestObserver = handler.telemetry(endpoints, this);
+            this.requestObserver = sessionHandler.telemetry(endpoints, this);
         } catch (Throwable t) {
-            handler.getScheduler().schedule(this::renewRequestObserver, 3, 
TimeUnit.SECONDS);
+            LOGGER.error("Failed to renew requestObserver, attempt to renew 
later, endpoints={}, delay={}", endpoints,
+                REQUEST_OBSERVER_RENEW_BACKOFF_DELAY, t);
+            sessionHandler.getScheduler().schedule(this::renewRequestObserver,
+                REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(), 
TimeUnit.NANOSECONDS);
+            return;
         }
+        syncSettings();
     }
 
     protected ListenableFuture<Void> syncSettingsSafely() {
         try {
-            final TelemetryCommand settings = handler.settingsCommand();
-            fireWrite(settings);
-            return handler.awaitSettingSynchronized();
+            this.syncSettings();
+            return sessionHandler.awaitSettingSynchronized();
         } catch (Throwable t) {
             return Futures.immediateFailedFuture(t);
         }
     }
 
+    private void syncSettings() {
+        final TelemetryCommand settings = sessionHandler.settingsCommand();
+        fireWrite(settings);
+    }
+
     /**
      * Release telemetry session.
      */
@@ -83,7 +95,7 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
         }
     }
 
-    public void fireWrite(TelemetryCommand command) {
+    void fireWrite(TelemetryCommand command) {
         if (null == requestObserver) {
             LOGGER.error("Request observer does not exist, ignore current 
command, endpoints={}, command={}",
                 endpoints, command);
@@ -94,13 +106,13 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
 
     @Override
     public void onNext(TelemetryCommand command) {
-        final String clientId = handler.clientId();
+        final String clientId = sessionHandler.clientId();
         try {
             switch (command.getCommandCase()) {
                 case SETTINGS: {
                     final Settings settings = command.getSettings();
                     LOGGER.info("Receive settings from remote, endpoints={}, 
clientId={}", endpoints, clientId);
-                    handler.onSettingsCommand(endpoints, settings);
+                    sessionHandler.onSettingsCommand(endpoints, settings);
                     break;
                 }
                 case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
@@ -108,14 +120,14 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
                         command.getRecoverOrphanedTransactionCommand();
                     LOGGER.info("Receive orphaned transaction recovery command 
from remote, endpoints={}, "
                         + "clientId={}", endpoints, clientId);
-                    handler.onRecoverOrphanedTransactionCommand(endpoints, 
recoverOrphanedTransactionCommand);
+                    
sessionHandler.onRecoverOrphanedTransactionCommand(endpoints, 
recoverOrphanedTransactionCommand);
                     break;
                 }
                 case VERIFY_MESSAGE_COMMAND: {
                     final VerifyMessageCommand verifyMessageCommand = 
command.getVerifyMessageCommand();
                     LOGGER.info("Receive message verification command from 
remote, endpoints={}, clientId={}",
                         endpoints, clientId);
-                    handler.onVerifyMessageCommand(endpoints, 
verifyMessageCommand);
+                    sessionHandler.onVerifyMessageCommand(endpoints, 
verifyMessageCommand);
                     break;
                 }
                 case PRINT_THREAD_STACK_TRACE_COMMAND: {
@@ -123,7 +135,7 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
                         command.getPrintThreadStackTraceCommand();
                     LOGGER.info("Receive thread stack print command from 
remote, endpoints={}, clientId={}",
                         endpoints, clientId);
-                    handler.onPrintThreadStackTraceCommand(endpoints, 
printThreadStackTraceCommand);
+                    sessionHandler.onPrintThreadStackTraceCommand(endpoints, 
printThreadStackTraceCommand);
                     break;
                 }
                 default:
@@ -139,20 +151,20 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
     @Override
     public void onError(Throwable throwable) {
         LOGGER.error("Exception raised from stream response observer, 
clientId={}, endpoints={}",
-            handler.clientId(), endpoints, throwable);
+            sessionHandler.clientId(), endpoints, throwable);
         release();
-        if (!handler.isRunning()) {
+        if (!sessionHandler.isRunning()) {
             return;
         }
-        handler.getScheduler().schedule(this::renewRequestObserver, 3, 
TimeUnit.SECONDS);
+        sessionHandler.getScheduler().schedule(this::renewRequestObserver, 3, 
TimeUnit.SECONDS);
     }
 
     @Override
     public void onCompleted() {
         release();
-        if (!handler.isRunning()) {
+        if (!sessionHandler.isRunning()) {
             return;
         }
-        handler.getScheduler().schedule(this::renewRequestObserver, 3, 
TimeUnit.SECONDS);
+        sessionHandler.getScheduler().schedule(this::renewRequestObserver, 3, 
TimeUnit.SECONDS);
     }
 }

Reply via email to