This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new c3cb6620 Bugfix: ClientSessionImpl#syncSettings should not block
asynchronous worker thread
c3cb6620 is described below
commit c3cb6620bb050242ae9fa1c80ede66aba74fbe34
Author: Aaron Ai <[email protected]>
AuthorDate: Wed Dec 7 14:59:34 2022 +0800
Bugfix: ClientSessionImpl#syncSettings should not block asynchronous worker
thread
---
.../rocketmq/client/java/impl/ClientImpl.java | 24 ++++++++++++++--------
.../client/java/impl/ClientSessionImpl.java | 16 ++++++++-------
.../client/java/impl/ClientSessionImplTest.java | 2 +-
3 files changed, 25 insertions(+), 17 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index b3981b60..4def65e5 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -34,6 +34,7 @@ import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.ThreadStackTrace;
import apache.rocketmq.v2.VerifyMessageCommand;
import apache.rocketmq.v2.VerifyMessageResult;
+import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.FutureCallback;
@@ -47,6 +48,7 @@ import io.grpc.stub.StreamObserver;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -362,6 +364,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
public void removeClientSession(Endpoints endpoints, ClientSessionImpl
clientSession) {
sessionsLock.writeLock().lock();
try {
+ log.info("Remove client session, clientId={}, endpoints={}",
clientId, endpoints);
sessionsTable.remove(endpoints, clientSession);
} finally {
sessionsLock.writeLock().unlock();
@@ -395,20 +398,25 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
/**
* Triggered when {@link TopicRouteData} is fetched from remote.
*/
- public void onTopicRouteDataFetched(String topic, TopicRouteData
topicRouteData) throws ClientException,
- ExecutionException, InterruptedException {
+ public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String
topic,
+ TopicRouteData topicRouteData) throws ClientException {
final Set<Endpoints> routeEndpoints = topicRouteData
.getMessageQueues().stream()
.map(mq -> mq.getBroker().getEndpoints())
.collect(Collectors.toSet());
final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
final Set<Endpoints> newEndpoints = new
HashSet<>(Sets.difference(routeEndpoints, existRouteEndpoints));
+ List<ListenableFuture<?>> futures = new ArrayList<>();
for (Endpoints endpoints : newEndpoints) {
final ClientSessionImpl clientSession =
getClientSession(endpoints);
- clientSession.syncSettings();
+ futures.add(clientSession.syncSettings());
}
- topicRouteCache.put(topic, topicRouteData);
- onTopicRouteDataUpdate0(topic, topicRouteData);
+ final ListenableFuture<?> future = Futures.allAsList(futures);
+ return Futures.transform(future, (Function<Object, TopicRouteData>)
input -> {
+ topicRouteCache.put(topic, topicRouteData);
+ onTopicRouteDataUpdate0(topic, topicRouteData);
+ return topicRouteData;
+ }, MoreExecutors.directExecutor());
}
public void onTopicRouteDataUpdate0(String topic, TopicRouteData
topicRouteData) {
@@ -582,10 +590,8 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
private ListenableFuture<TopicRouteData> fetchTopicRoute(final String
topic) {
final ListenableFuture<TopicRouteData> future0 =
fetchTopicRoute0(topic);
- final ListenableFuture<TopicRouteData> future =
Futures.transformAsync(future0, topicRouteData -> {
- onTopicRouteDataFetched(topic, topicRouteData);
- return Futures.immediateFuture(topicRouteData);
- }, MoreExecutors.directExecutor());
+ final ListenableFuture<TopicRouteData> future =
Futures.transformAsync(future0,
+ topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData),
MoreExecutors.directExecutor());
Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
@Override
public void onSuccess(TopicRouteData topicRouteData) {
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 663b62ed..8ec03132 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -23,10 +23,10 @@ import apache.rocketmq.v2.Settings;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.VerifyMessageCommand;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
@@ -45,7 +45,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
private final ClientSessionHandler sessionHandler;
private final Endpoints endpoints;
- private final SettableFuture<Settings> settingsSettableFuture;
+ private final SettableFuture<Settings> future;
private volatile StreamObserver<TelemetryCommand> requestObserver;
@SuppressWarnings("UnstableApiUsage")
@@ -53,8 +53,8 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
throws ClientException {
this.sessionHandler = sessionHandler;
this.endpoints = endpoints;
- this.settingsSettableFuture = SettableFuture.create();
- Futures.withTimeout(settingsSettableFuture,
SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
+ this.future = SettableFuture.create();
+ Futures.withTimeout(future,
SETTINGS_INITIALIZATION_TIMEOUT.plus(tolerance).toMillis(),
TimeUnit.MILLISECONDS, sessionHandler.getScheduler());
this.requestObserver = sessionHandler.telemetry(endpoints, this);
}
@@ -82,9 +82,9 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
syncSettings0();
}
- protected void syncSettings() throws ExecutionException,
InterruptedException {
+ protected ListenableFuture<Settings> syncSettings() {
this.syncSettings0();
- settingsSettableFuture.get();
+ return future;
}
private void syncSettings0() {
@@ -128,7 +128,9 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
final Settings settings = command.getSettings();
log.info("Receive settings from remote, endpoints={},
clientId={}", endpoints, clientId);
sessionHandler.onSettingsCommand(endpoints, settings);
- settingsSettableFuture.set(settings);
+ if (future.set(settings)) {
+ log.info("Init settings successfully, endpoints={},
clientId={}", endpoints, clientId);
+ }
break;
}
case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
index 2c6b9805..9f7a58af 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
@@ -63,7 +63,7 @@ public class ClientSessionImplTest extends TestBase {
final Settings settings = Settings.newBuilder().build();
TelemetryCommand settingsCommand =
TelemetryCommand.newBuilder().setSettings(settings).build();
executor.submit(() -> clientSession.onNext(settingsCommand));
- clientSession.syncSettings();
+ clientSession.syncSettings().get();
Mockito.verify(sessionHandler,
times(1)).onSettingsCommand(eq(endpoints), eq(settings));
}