This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a7f5742 Add ClientSessionImplTest (#183)
a7f5742 is described below
commit a7f5742385031094fb8f7d1b60889fd3133af62d
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Aug 25 11:42:14 2022 +0800
Add ClientSessionImplTest (#183)
---
.../rocketmq/client/java/impl/ClientImpl.java | 6 +-
.../client/java/impl/ClientSessionImpl.java | 39 ++-
.../client/java/impl/ClientSessionImplTest.java | 266 +++++++++++++++++++++
.../apache/rocketmq/client/java/tool/TestBase.java | 3 +-
4 files changed, 300 insertions(+), 14 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index e2807fb..23758d9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -217,9 +217,9 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
} else {
LOGGER.info("Shutdown the telemetry command executor successfully,
clientId={}", clientId);
}
- LOGGER.info("Begin to release telemetry sessions, clientId={}",
clientId);
+ LOGGER.info("Begin to release all telemetry sessions, clientId={}",
clientId);
releaseClientSessions();
- LOGGER.info("Release telemetry sessions successfully, clientId={}",
clientId);
+ LOGGER.info("Release all telemetry sessions successfully,
clientId={}", clientId);
clientManager.stopAsync().awaitTerminated();
clientCallbackExecutor.shutdown();
if (!ExecutorServices.awaitTerminated(clientCallbackExecutor)) {
@@ -356,7 +356,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
}
- public ClientSessionImpl getClientSession(Endpoints endpoints) throws
ClientException {
+ private ClientSessionImpl getClientSession(Endpoints endpoints) throws
ClientException {
sessionsLock.readLock().lock();
try {
final ClientSessionImpl session = sessionsTable.get(endpoints);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index f2edc70..9ccecc4 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -38,8 +38,9 @@ import org.slf4j.LoggerFactory;
* Telemetry session is constructed before first communication between client
and remote route endpoints.
*/
public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
+ static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY =
Duration.ofSeconds(1);
+
private static final Logger LOGGER =
LoggerFactory.getLogger(ClientSessionImpl.class);
- private static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY =
Duration.ofSeconds(1);
private static final long SETTINGS_INITIALIZATION_TIMEOUT_MILLIS = 3000;
private final ClientSessionHandler sessionHandler;
@@ -55,19 +56,24 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
}
private void renewRequestObserver() {
+ final String clientId = sessionHandler.clientId();
try {
if (sessionHandler.isEndpointsDeprecated(endpoints)) {
- LOGGER.info("Endpoints is deprecated, no longer to renew
requestObserver, endpoints={}", endpoints);
+ LOGGER.info("Endpoints is deprecated, no longer to renew
requestObserver, endpoints={}, clientId={}",
+ endpoints, clientId);
return;
}
+ LOGGER.info("Try to renew requestObserver, endpoints={},
clientId={}", endpoints, clientId);
this.requestObserver = sessionHandler.telemetry(endpoints, this);
} catch (Throwable t) {
- LOGGER.error("Failed to renew requestObserver, attempt to renew
later, endpoints={}, delay={}", endpoints,
- REQUEST_OBSERVER_RENEW_BACKOFF_DELAY, t);
+ LOGGER.error("Failed to renew requestObserver, attempt to renew
later, endpoints={}, delay={}, clientId={}",
+ endpoints, REQUEST_OBSERVER_RENEW_BACKOFF_DELAY, clientId, t);
sessionHandler.getScheduler().schedule(this::renewRequestObserver,
REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(),
TimeUnit.NANOSECONDS);
return;
}
+ LOGGER.info("Sync setting to remote after requestObserver is renewed,
endpoints={}, clientId={}", endpoints,
+ clientId);
syncSettings0();
}
@@ -85,9 +91,13 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
* Release telemetry session.
*/
public void release() {
+ final String clientId = sessionHandler.clientId();
if (null == requestObserver) {
+ LOGGER.error("[Bug] request observer does not exist, no need to
release, endpoints={}, clientId={}",
+ endpoints, clientId);
return;
}
+ LOGGER.info("Begin to release client session, endpoints={},
clientId={}", endpoints, clientId);
try {
requestObserver.onCompleted();
} catch (Throwable ignore) {
@@ -97,8 +107,8 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
void write(TelemetryCommand command) {
if (null == requestObserver) {
- LOGGER.error("Request observer does not exist, ignore current
command, endpoints={}, command={}",
- endpoints, command);
+ LOGGER.error("[Bug] Request observer does not exist, ignore
current command, endpoints={}, command={}, "
+ + "clientId={}", endpoints, command,
sessionHandler.clientId());
return;
}
requestObserver.onNext(command);
@@ -151,21 +161,30 @@ public class ClientSessionImpl implements
StreamObserver<TelemetryCommand> {
@Override
public void onError(Throwable throwable) {
- LOGGER.error("Exception raised from stream response observer,
clientId={}, endpoints={}",
- sessionHandler.clientId(), endpoints, throwable);
+ final String clientId = sessionHandler.clientId();
+ LOGGER.error("Exception raised from stream response observer,
clientId={}, endpoints={}", clientId, endpoints,
+ throwable);
release();
if (!sessionHandler.isRunning()) {
+ LOGGER.info("Session handler is not running, forgive to renew
request observer, clientId={}, "
+ + "endpoints={}", clientId, endpoints);
return;
}
- sessionHandler.getScheduler().schedule(this::renewRequestObserver, 3,
TimeUnit.SECONDS);
+ sessionHandler.getScheduler().schedule(this::renewRequestObserver,
+ REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(),
TimeUnit.NANOSECONDS);
}
@Override
public void onCompleted() {
+ final String clientId = sessionHandler.clientId();
+ LOGGER.info("Receive completion for stream response observer,
clientId={}, endpoints={}", clientId, endpoints);
release();
if (!sessionHandler.isRunning()) {
+ LOGGER.info("Session handler is not running, forgive to renew
request observer, clientId={}, "
+ + "endpoints={}", clientId, endpoints);
return;
}
- sessionHandler.getScheduler().schedule(this::renewRequestObserver, 3,
TimeUnit.SECONDS);
+ sessionHandler.getScheduler().schedule(this::renewRequestObserver,
+ REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(),
TimeUnit.NANOSECONDS);
}
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
new file mode 100644
index 0000000..b77bf3a
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+
+import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.TelemetryCommand;
+import apache.rocketmq.v2.VerifyMessageCommand;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.awaitility.Durations;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ClientSessionImplTest extends TestBase {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void syncSettings() throws ClientException, ExecutionException,
InterruptedException, TimeoutException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+
Mockito.doNothing().when(requestObserver).onNext(any(TelemetryCommand.class));
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+
Mockito.doNothing().when(sessionHandler).onSettingsCommand(any(Endpoints.class),
any(Settings.class));
+ final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60,
TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>());
+ final Settings settings = Settings.newBuilder().build();
+ TelemetryCommand settingsCommand =
TelemetryCommand.newBuilder().setSettings(settings).build();
+ executor.submit(() -> clientSession.onNext(settingsCommand));
+ clientSession.syncSettings();
+ Mockito.verify(sessionHandler,
times(1)).onSettingsCommand(eq(endpoints), eq(settings));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOnNextWithRecoverOrphanedTransactionCommand() throws
ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+
Mockito.doNothing().when(sessionHandler).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
+ any(RecoverOrphanedTransactionCommand.class));
+ RecoverOrphanedTransactionCommand command0 =
RecoverOrphanedTransactionCommand.newBuilder().build();
+ TelemetryCommand command = TelemetryCommand.newBuilder()
+ .setRecoverOrphanedTransactionCommand(command0).build();
+ clientSession.onNext(command);
+ Mockito.verify(sessionHandler,
times(1)).onRecoverOrphanedTransactionCommand(eq(endpoints), eq(command0));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnNextWithVerifyMessageCommand() throws ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+
Mockito.doNothing().when(sessionHandler).onVerifyMessageCommand(any(Endpoints.class),
+ any(VerifyMessageCommand.class));
+ VerifyMessageCommand command0 =
VerifyMessageCommand.newBuilder().build();
+ TelemetryCommand command = TelemetryCommand.newBuilder()
+ .setVerifyMessageCommand(command0).build();
+ clientSession.onNext(command);
+ Mockito.verify(sessionHandler,
times(1)).onVerifyMessageCommand(eq(endpoints), eq(command0));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnNextWithPrintThreadStackTraceCommand() throws
ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+
Mockito.doNothing().when(sessionHandler).onPrintThreadStackTraceCommand(any(Endpoints.class),
+ any(PrintThreadStackTraceCommand.class));
+ PrintThreadStackTraceCommand command0 =
PrintThreadStackTraceCommand.newBuilder().build();
+ TelemetryCommand command = TelemetryCommand.newBuilder()
+ .setPrintThreadStackTraceCommand(command0).build();
+ clientSession.onNext(command);
+ Mockito.verify(sessionHandler,
times(1)).onPrintThreadStackTraceCommand(eq(endpoints), eq(command0));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnNextWithUnrecognizedCommand() throws ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+
Mockito.doNothing().when(sessionHandler).onSettingsCommand(any(Endpoints.class),
any(Settings.class));
+
Mockito.doNothing().when(sessionHandler).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
+ any(RecoverOrphanedTransactionCommand.class));
+
Mockito.doNothing().when(sessionHandler).onVerifyMessageCommand(any(Endpoints.class),
+ any(VerifyMessageCommand.class));
+
Mockito.doNothing().when(sessionHandler).onPrintThreadStackTraceCommand(any(Endpoints.class),
+ any(PrintThreadStackTraceCommand.class));
+ TelemetryCommand command = TelemetryCommand.newBuilder().build();
+ clientSession.onNext(command);
+ Mockito.verify(sessionHandler,
never()).onSettingsCommand(any(Endpoints.class), any(Settings.class));
+ Mockito.verify(sessionHandler,
never()).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
+ any(RecoverOrphanedTransactionCommand.class));
+ Mockito.verify(sessionHandler,
never()).onVerifyMessageCommand(any(Endpoints.class),
+ any(VerifyMessageCommand.class));
+ Mockito.verify(sessionHandler,
never()).onPrintThreadStackTraceCommand(any(Endpoints.class),
+ any(PrintThreadStackTraceCommand.class));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnError() throws ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ Mockito.doNothing().when(requestObserver).onCompleted();
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+ Mockito.doReturn(true).when(sessionHandler).isRunning();
+ Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
+
Mockito.doReturn(false).when(sessionHandler).isEndpointsDeprecated(endpoints);
+ final Exception e = new Exception();
+ clientSession.onError(e);
+ Mockito.verify(sessionHandler, times(1)).isRunning();
+ Mockito.verify(requestObserver, times(1)).onCompleted();
+ Mockito.verify(sessionHandler, times(1)).getScheduler();
+
await().atMost(ClientSessionImpl.REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.plus(Durations.ONE_SECOND))
+ .untilAsserted(() -> {
+ Mockito.verify(sessionHandler,
times(1)).isEndpointsDeprecated(eq(endpoints));
+ Mockito.verify(sessionHandler,
times(2)).telemetry(eq(endpoints), eq(clientSession));
+ });
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnErrorWithEndpointsDeprecated() throws ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ Mockito.doNothing().when(requestObserver).onCompleted();
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+ Mockito.doReturn(true).when(sessionHandler).isRunning();
+ Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
+
Mockito.doReturn(true).when(sessionHandler).isEndpointsDeprecated(endpoints);
+ final Exception e = new Exception();
+ clientSession.onError(e);
+ Mockito.verify(sessionHandler, times(1)).isRunning();
+ Mockito.verify(requestObserver, times(1)).onCompleted();
+ Mockito.verify(sessionHandler, times(1)).getScheduler();
+
await().atMost(ClientSessionImpl.REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.plus(Durations.ONE_SECOND))
+ .untilAsserted(() -> {
+ Mockito.verify(sessionHandler,
times(1)).isEndpointsDeprecated(eq(endpoints));
+ Mockito.verify(sessionHandler,
times(1)).telemetry(eq(endpoints), eq(clientSession));
+ });
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnErrorWithSessionHandlerIsNotRunning() throws
ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ Mockito.doNothing().when(requestObserver).onCompleted();
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+ Mockito.doReturn(false).when(sessionHandler).isRunning();
+ final Exception e = new Exception();
+ clientSession.onError(e);
+ Mockito.verify(sessionHandler, times(1)).isRunning();
+ Mockito.verify(requestObserver, times(1)).onCompleted();
+ Mockito.verify(sessionHandler, never()).getScheduler();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnCompleted() throws ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ Mockito.doNothing().when(requestObserver).onCompleted();
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+ Mockito.doReturn(true).when(sessionHandler).isRunning();
+ Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
+
Mockito.doReturn(false).when(sessionHandler).isEndpointsDeprecated(endpoints);
+ clientSession.onCompleted();
+ Mockito.verify(sessionHandler, times(1)).isRunning();
+ Mockito.verify(requestObserver, times(1)).onCompleted();
+ Mockito.verify(sessionHandler, times(1)).getScheduler();
+
await().atMost(ClientSessionImpl.REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.plus(Durations.ONE_SECOND))
+ .untilAsserted(() -> {
+ Mockito.verify(sessionHandler,
times(1)).isEndpointsDeprecated(eq(endpoints));
+ Mockito.verify(sessionHandler,
times(2)).telemetry(eq(endpoints), eq(clientSession));
+ });
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOnCompletedWithSessionHandlerIsNotRunning() throws
ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ final ClientSessionHandler sessionHandler =
Mockito.mock(ClientSessionHandler.class);
+ final StreamObserver<TelemetryCommand> requestObserver =
Mockito.mock(StreamObserver.class);
+
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+ any(StreamObserver.class));
+ Mockito.doNothing().when(requestObserver).onCompleted();
+ final ClientSessionImpl clientSession = new
ClientSessionImpl(sessionHandler, endpoints);
+ Mockito.doReturn("clientId").when(sessionHandler).clientId();
+ Mockito.doReturn(false).when(sessionHandler).isRunning();
+ clientSession.onCompleted();
+ Mockito.verify(sessionHandler, times(1)).isRunning();
+ Mockito.verify(requestObserver, times(1)).onCompleted();
+ Mockito.verify(sessionHandler, never()).getScheduler();
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index e0da4a1..3b41f80 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -114,7 +114,8 @@ public class TestBase {
protected static final long FAKE_OFFSET = 1;
protected static final ScheduledExecutorService SCHEDULER =
- new ScheduledThreadPoolExecutor(1, new
ThreadFactoryImpl("TestScheduler"));
+ new
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new
ThreadFactoryImpl(
+ "TestScheduler"));
protected static final ThreadPoolExecutor SINGLE_THREAD_POOL_EXECUTOR =
new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,