This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new c3cb6620 Bugfix: ClientSessionImpl#syncSettings should not block 
asynchronous worker thread
c3cb6620 is described below

commit c3cb6620bb050242ae9fa1c80ede66aba74fbe34
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Dec 7 14:59:34 2022 +0800

    Bugfix: ClientSessionImpl#syncSettings should not block asynchronous worker 
thread
---
 .../rocketmq/client/java/impl/ClientImpl.java      | 24 ++++++++++++++--------
 .../client/java/impl/ClientSessionImpl.java        | 16 ++++++++-------
 .../client/java/impl/ClientSessionImplTest.java    |  2 +-
 3 files changed, 25 insertions(+), 17 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index b3981b60..4def65e5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -34,6 +34,7 @@ import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.ThreadStackTrace;
 import apache.rocketmq.v2.VerifyMessageCommand;
 import apache.rocketmq.v2.VerifyMessageResult;
+import com.google.common.base.Function;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.FutureCallback;
@@ -47,6 +48,7 @@ import io.grpc.stub.StreamObserver;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -362,6 +364,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     public void removeClientSession(Endpoints endpoints, ClientSessionImpl 
clientSession) {
         sessionsLock.writeLock().lock();
         try {
+            log.info("Remove client session, clientId={}, endpoints={}", 
clientId, endpoints);
             sessionsTable.remove(endpoints, clientSession);
         } finally {
             sessionsLock.writeLock().unlock();
@@ -395,20 +398,25 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     /**
      * Triggered when {@link TopicRouteData} is fetched from remote.
      */
-    public void onTopicRouteDataFetched(String topic, TopicRouteData 
topicRouteData) throws ClientException,
-        ExecutionException, InterruptedException {
+    public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String 
topic,
+    TopicRouteData topicRouteData) throws ClientException {
         final Set<Endpoints> routeEndpoints = topicRouteData
             .getMessageQueues().stream()
             .map(mq -> mq.getBroker().getEndpoints())
             .collect(Collectors.toSet());
         final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
         final Set<Endpoints> newEndpoints = new 
HashSet<>(Sets.difference(routeEndpoints, existRouteEndpoints));
+        List<ListenableFuture<?>> futures = new ArrayList<>();
         for (Endpoints endpoints : newEndpoints) {
             final ClientSessionImpl clientSession = 
getClientSession(endpoints);
-            clientSession.syncSettings();
+            futures.add(clientSession.syncSettings());
         }
-        topicRouteCache.put(topic, topicRouteData);
-        onTopicRouteDataUpdate0(topic, topicRouteData);
+        final ListenableFuture<?> future = Futures.allAsList(futures);
+        return Futures.transform(future, (Function<Object, TopicRouteData>) 
input -> {
+            topicRouteCache.put(topic, topicRouteData);
+            onTopicRouteDataUpdate0(topic, topicRouteData);
+            return topicRouteData;
+        }, MoreExecutors.directExecutor());
     }
 
     public void onTopicRouteDataUpdate0(String topic, TopicRouteData 
topicRouteData) {
@@ -582,10 +590,8 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
 
     private ListenableFuture<TopicRouteData> fetchTopicRoute(final String 
topic) {
         final ListenableFuture<TopicRouteData> future0 = 
fetchTopicRoute0(topic);
-        final ListenableFuture<TopicRouteData> future = 
Futures.transformAsync(future0, topicRouteData -> {
-            onTopicRouteDataFetched(topic, topicRouteData);
-            return Futures.immediateFuture(topicRouteData);
-        }, MoreExecutors.directExecutor());
+        final ListenableFuture<TopicRouteData> future = 
Futures.transformAsync(future0,
+            topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData), 
MoreExecutors.directExecutor());
         Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
             @Override
             public void onSuccess(TopicRouteData topicRouteData) {
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 663b62ed..8ec03132 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -23,10 +23,10 @@ import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import io.grpc.stub.StreamObserver;
 import java.time.Duration;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
@@ -45,7 +45,7 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
 
     private final ClientSessionHandler sessionHandler;
     private final Endpoints endpoints;
-    private final SettableFuture<Settings> settingsSettableFuture;
+    private final SettableFuture<Settings> future;
     private volatile StreamObserver<TelemetryCommand> requestObserver;
 
     @SuppressWarnings("UnstableApiUsage")
@@ -53,8 +53,8 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
         throws ClientException {
         this.sessionHandler = sessionHandler;
         this.endpoints = endpoints;
-        this.settingsSettableFuture = SettableFuture.create();
-        Futures.withTimeout(settingsSettableFuture, 
SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
+        this.future = SettableFuture.create();
+        Futures.withTimeout(future, 
SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
             TimeUnit.MILLISECONDS, sessionHandler.getScheduler());
         this.requestObserver = sessionHandler.telemetry(endpoints, this);
     }
@@ -82,9 +82,9 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
         syncSettings0();
     }
 
-    protected void syncSettings() throws ExecutionException, 
InterruptedException {
+    protected ListenableFuture<Settings> syncSettings() {
         this.syncSettings0();
-        settingsSettableFuture.get();
+        return future;
     }
 
     private void syncSettings0() {
@@ -128,7 +128,9 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
                     final Settings settings = command.getSettings();
                     log.info("Receive settings from remote, endpoints={}, 
clientId={}", endpoints, clientId);
                     sessionHandler.onSettingsCommand(endpoints, settings);
-                    settingsSettableFuture.set(settings);
+                    if (future.set(settings)) {
+                        log.info("Init settings successfully, endpoints={}, 
clientId={}", endpoints, clientId);
+                    }
                     break;
                 }
                 case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
index 2c6b9805..9f7a58af 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
@@ -63,7 +63,7 @@ public class ClientSessionImplTest extends TestBase {
         final Settings settings = Settings.newBuilder().build();
         TelemetryCommand settingsCommand = 
TelemetryCommand.newBuilder().setSettings(settings).build();
         executor.submit(() -> clientSession.onNext(settingsCommand));
-        clientSession.syncSettings();
+        clientSession.syncSettings().get();
         Mockito.verify(sessionHandler, 
times(1)).onSettingsCommand(eq(endpoints), eq(settings));
     }
 

Reply via email to