This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a0b796d Bugfix: reverse result in
ClientSessionHandler#isEndpointsDeprecated (#108)
a0b796d is described below
commit a0b796d1f9130a109d07b360552e515d1b2a429b
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 16:29:42 2022 +0800
Bugfix: reverse result in ClientSessionHandler#isEndpointsDeprecated (#108)
---
.../rocketmq/client/java/impl/ClientImpl.java | 29 +++++++-----
.../client/java/impl/ClientSessionImpl.java | 54 +++++++++++++---------
2 files changed, 51 insertions(+), 32 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 2ee5e23..da1ff6f 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -35,6 +35,7 @@ import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.ThreadStackTrace;
import apache.rocketmq.v2.VerifyMessageCommand;
import apache.rocketmq.v2.VerifyMessageResult;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -299,7 +300,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
@Override
public boolean isEndpointsDeprecated(Endpoints endpoints) {
final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
- return totalRouteEndpoints.contains(endpoints);
+ return !totalRouteEndpoints.contains(endpoints);
}
@Override
@@ -374,8 +375,8 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
public void telemetry(Endpoints endpoints, TelemetryCommand command) {
- final ClientSessionImpl clientSession = getClientSession(endpoints);
try {
+ final ClientSessionImpl clientSession =
getClientSession(endpoints);
clientSession.fireWrite(command);
} catch (Throwable t) {
LOGGER.error("Failed to fire write telemetry command, clientId={},
endpoints={}", clientId, endpoints, t);
@@ -391,7 +392,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
}
- public ClientSessionImpl getClientSession(Endpoints endpoints) {
+ public ClientSessionImpl getClientSession(Endpoints endpoints) throws
ClientException {
sessionsLock.readLock().lock();
try {
final ClientSessionImpl session = sessionsTable.get(endpoints);
@@ -415,9 +416,13 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
}
- public ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) {
- final ClientSessionImpl clientSession = getClientSession(endpoints);
- return clientSession.syncSettingsSafely();
+ private ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) {
+ try {
+ final ClientSessionImpl clientSession =
getClientSession(endpoints);
+ return clientSession.syncSettingsSafely();
+ } catch (Throwable t) {
+ return Futures.immediateFailedFuture(t);
+ }
}
/**
@@ -426,13 +431,15 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
* <p>Never thrown any exception.
*/
public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String
topic, TopicRouteData topicRouteData) {
- final List<ListenableFuture<Void>> futures = topicRouteData
+ final Set<Endpoints> routeEndpoints = topicRouteData
.getMessageQueues().stream()
.map(mq -> mq.getBroker().getEndpoints())
- .collect(Collectors.toSet())
- .stream().map(this::syncSettingsSafely)
- .collect(Collectors.toList());
- // TODO: Record exception.
+ .collect(Collectors.toSet());
+ final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
+ final Set<Endpoints> newEndpoints = new
HashSet<>(Sets.difference(routeEndpoints,
+ existRouteEndpoints));
+ final List<ListenableFuture<Void>> futures =
+
newEndpoints.stream().map(this::syncSettingsSafely).collect(Collectors.toList());
return Futures.whenAllSucceed(futures).callAsync(() -> {
topicRouteCache.put(topic, topicRouteData);
onTopicRouteDataUpdate0(topic, topicRouteData);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 90402eb..b699ab0 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -25,7 +25,9 @@ import apache.rocketmq.v2.VerifyMessageCommand;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.stub.StreamObserver;
+import java.time.Duration;
import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
@@ -36,39 +38,49 @@ import org.slf4j.LoggerFactory;
*/
public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClientSessionImpl.class);
+ private static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY =
Duration.ofSeconds(1);
- private final ClientSessionHandler handler;
+ private final ClientSessionHandler sessionHandler;
private final Endpoints endpoints;
private volatile StreamObserver<TelemetryCommand> requestObserver;
- protected ClientSessionImpl(ClientSessionHandler handler, Endpoints
endpoints) {
- this.handler = handler;
+ protected ClientSessionImpl(ClientSessionHandler sessionHandler, Endpoints
endpoints) throws ClientException {
+ this.sessionHandler = sessionHandler;
this.endpoints = endpoints;
- renewRequestObserver();
+ this.requestObserver = sessionHandler.telemetry(endpoints, this);
}
private void renewRequestObserver() {
try {
- if (handler.isEndpointsDeprecated(endpoints)) {
+ if (sessionHandler.isEndpointsDeprecated(endpoints)) {
LOGGER.info("Endpoints is deprecated, no longer to renew
requestObserver, endpoints={}", endpoints);
return;
}
- this.requestObserver = handler.telemetry(endpoints, this);
+ this.requestObserver = sessionHandler.telemetry(endpoints, this);
} catch (Throwable t) {
- handler.getScheduler().schedule(this::renewRequestObserver, 3,
TimeUnit.SECONDS);
+ LOGGER.error("Failed to renew requestObserver, attempt to renew
later, endpoints={}, delay={}", endpoints,
+ REQUEST_OBSERVER_RENEW_BACKOFF_DELAY, t);
+ sessionHandler.getScheduler().schedule(this::renewRequestObserver,
+ REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(),
TimeUnit.NANOSECONDS);
+ return;
}
+ syncSettings();
}
protected ListenableFuture<Void> syncSettingsSafely() {
try {
- final TelemetryCommand settings = handler.settingsCommand();
- fireWrite(settings);
- return handler.awaitSettingSynchronized();
+ this.syncSettings();
+ return sessionHandler.awaitSettingSynchronized();
} catch (Throwable t) {
return Futures.immediateFailedFuture(t);
}
}
+ private void syncSettings() {
+ final TelemetryCommand settings = sessionHandler.settingsCommand();
+ fireWrite(settings);
+ }
+
/**
* Release telemetry session.
*/
@@ -83,7 +95,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
}
}
- public void fireWrite(TelemetryCommand command) {
+ void fireWrite(TelemetryCommand command) {
if (null == requestObserver) {
LOGGER.error("Request observer does not exist, ignore current
command, endpoints={}, command={}",
endpoints, command);
@@ -94,13 +106,13 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
@Override
public void onNext(TelemetryCommand command) {
- final String clientId = handler.clientId();
+ final String clientId = sessionHandler.clientId();
try {
switch (command.getCommandCase()) {
case SETTINGS: {
final Settings settings = command.getSettings();
LOGGER.info("Receive settings from remote, endpoints={},
clientId={}", endpoints, clientId);
- handler.onSettingsCommand(endpoints, settings);
+ sessionHandler.onSettingsCommand(endpoints, settings);
break;
}
case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
@@ -108,14 +120,14 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
command.getRecoverOrphanedTransactionCommand();
LOGGER.info("Receive orphaned transaction recovery command
from remote, endpoints={}, "
+ "clientId={}", endpoints, clientId);
- handler.onRecoverOrphanedTransactionCommand(endpoints,
recoverOrphanedTransactionCommand);
+
sessionHandler.onRecoverOrphanedTransactionCommand(endpoints,
recoverOrphanedTransactionCommand);
break;
}
case VERIFY_MESSAGE_COMMAND: {
final VerifyMessageCommand verifyMessageCommand =
command.getVerifyMessageCommand();
LOGGER.info("Receive message verification command from
remote, endpoints={}, clientId={}",
endpoints, clientId);
- handler.onVerifyMessageCommand(endpoints,
verifyMessageCommand);
+ sessionHandler.onVerifyMessageCommand(endpoints,
verifyMessageCommand);
break;
}
case PRINT_THREAD_STACK_TRACE_COMMAND: {
@@ -123,7 +135,7 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
command.getPrintThreadStackTraceCommand();
LOGGER.info("Receive thread stack print command from
remote, endpoints={}, clientId={}",
endpoints, clientId);
- handler.onPrintThreadStackTraceCommand(endpoints,
printThreadStackTraceCommand);
+ sessionHandler.onPrintThreadStackTraceCommand(endpoints,
printThreadStackTraceCommand);
break;
}
default:
@@ -139,20 +151,20 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
@Override
public void onError(Throwable throwable) {
LOGGER.error("Exception raised from stream response observer,
clientId={}, endpoints={}",
- handler.clientId(), endpoints, throwable);
+ sessionHandler.clientId(), endpoints, throwable);
release();
- if (!handler.isRunning()) {
+ if (!sessionHandler.isRunning()) {
return;
}
- handler.getScheduler().schedule(this::renewRequestObserver, 3,
TimeUnit.SECONDS);
+ sessionHandler.getScheduler().schedule(this::renewRequestObserver, 3,
TimeUnit.SECONDS);
}
@Override
public void onCompleted() {
release();
- if (!handler.isRunning()) {
+ if (!sessionHandler.isRunning()) {
return;
}
- handler.getScheduler().schedule(this::renewRequestObserver, 3,
TimeUnit.SECONDS);
+ sessionHandler.getScheduler().schedule(this::renewRequestObserver, 3,
TimeUnit.SECONDS);
}
}