This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a7f5742  Add ClientSessionImplTest (#183)
a7f5742 is described below

commit a7f5742385031094fb8f7d1b60889fd3133af62d
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Aug 25 11:42:14 2022 +0800

    Add ClientSessionImplTest (#183)
---
 .../rocketmq/client/java/impl/ClientImpl.java      |   6 +-
 .../client/java/impl/ClientSessionImpl.java        |  39 ++-
 .../client/java/impl/ClientSessionImplTest.java    | 266 +++++++++++++++++++++
 .../apache/rocketmq/client/java/tool/TestBase.java |   3 +-
 4 files changed, 300 insertions(+), 14 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index e2807fb..23758d9 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -217,9 +217,9 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         } else {
             LOGGER.info("Shutdown the telemetry command executor successfully, 
clientId={}", clientId);
         }
-        LOGGER.info("Begin to release telemetry sessions, clientId={}", 
clientId);
+        LOGGER.info("Begin to release all telemetry sessions, clientId={}", 
clientId);
         releaseClientSessions();
-        LOGGER.info("Release telemetry sessions successfully, clientId={}", 
clientId);
+        LOGGER.info("Release all telemetry sessions successfully, 
clientId={}", clientId);
         clientManager.stopAsync().awaitTerminated();
         clientCallbackExecutor.shutdown();
         if (!ExecutorServices.awaitTerminated(clientCallbackExecutor)) {
@@ -356,7 +356,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         }
     }
 
-    public ClientSessionImpl getClientSession(Endpoints endpoints) throws 
ClientException {
+    private ClientSessionImpl getClientSession(Endpoints endpoints) throws 
ClientException {
         sessionsLock.readLock().lock();
         try {
             final ClientSessionImpl session = sessionsTable.get(endpoints);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index f2edc70..9ccecc4 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -38,8 +38,9 @@ import org.slf4j.LoggerFactory;
  * Telemetry session is constructed before first communication between client 
and remote route endpoints.
  */
 public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
+    static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY = 
Duration.ofSeconds(1);
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientSessionImpl.class);
-    private static final Duration REQUEST_OBSERVER_RENEW_BACKOFF_DELAY = 
Duration.ofSeconds(1);
     private static final long SETTINGS_INITIALIZATION_TIMEOUT_MILLIS = 3000;
 
     private final ClientSessionHandler sessionHandler;
@@ -55,19 +56,24 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
     }
 
     private void renewRequestObserver() {
+        final String clientId = sessionHandler.clientId();
         try {
             if (sessionHandler.isEndpointsDeprecated(endpoints)) {
-                LOGGER.info("Endpoints is deprecated, no longer to renew 
requestObserver, endpoints={}", endpoints);
+                LOGGER.info("Endpoints is deprecated, no longer to renew 
requestObserver, endpoints={}, clientId={}",
+                    endpoints, clientId);
                 return;
             }
+            LOGGER.info("Try to renew requestObserver, endpoints={}, 
clientId={}", endpoints, clientId);
             this.requestObserver = sessionHandler.telemetry(endpoints, this);
         } catch (Throwable t) {
-            LOGGER.error("Failed to renew requestObserver, attempt to renew 
later, endpoints={}, delay={}", endpoints,
-                REQUEST_OBSERVER_RENEW_BACKOFF_DELAY, t);
+            LOGGER.error("Failed to renew requestObserver, attempt to renew 
later, endpoints={}, delay={}, clientId={}",
+                endpoints, REQUEST_OBSERVER_RENEW_BACKOFF_DELAY, clientId, t);
             sessionHandler.getScheduler().schedule(this::renewRequestObserver,
                 REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(), 
TimeUnit.NANOSECONDS);
             return;
         }
+        LOGGER.info("Sync setting to remote after requestObserver is renewed, 
endpoints={}, clientId={}", endpoints,
+            clientId);
         syncSettings0();
     }
 
@@ -85,9 +91,13 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
      * Release telemetry session.
      */
     public void release() {
+        final String clientId = sessionHandler.clientId();
         if (null == requestObserver) {
+            LOGGER.error("[Bug] request observer does not exist, no need to 
release, endpoints={}, clientId={}",
+                endpoints, clientId);
             return;
         }
+        LOGGER.info("Begin to release client session, endpoints={}, 
clientId={}", endpoints, clientId);
         try {
             requestObserver.onCompleted();
         } catch (Throwable ignore) {
@@ -97,8 +107,8 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
 
     void write(TelemetryCommand command) {
         if (null == requestObserver) {
-            LOGGER.error("Request observer does not exist, ignore current 
command, endpoints={}, command={}",
-                endpoints, command);
+            LOGGER.error("[Bug] Request observer does not exist, ignore 
current command, endpoints={}, command={}, "
+                + "clientId={}", endpoints, command, 
sessionHandler.clientId());
             return;
         }
         requestObserver.onNext(command);
@@ -151,21 +161,30 @@ public class ClientSessionImpl implements 
StreamObserver<TelemetryCommand> {
 
     @Override
     public void onError(Throwable throwable) {
-        LOGGER.error("Exception raised from stream response observer, 
clientId={}, endpoints={}",
-            sessionHandler.clientId(), endpoints, throwable);
+        final String clientId = sessionHandler.clientId();
+        LOGGER.error("Exception raised from stream response observer, 
clientId={}, endpoints={}", clientId, endpoints,
+            throwable);
         release();
         if (!sessionHandler.isRunning()) {
+            LOGGER.info("Session handler is not running, forgive to renew 
request observer, clientId={}, "
+                + "endpoints={}", clientId, endpoints);
             return;
         }
-        sessionHandler.getScheduler().schedule(this::renewRequestObserver, 3, 
TimeUnit.SECONDS);
+        sessionHandler.getScheduler().schedule(this::renewRequestObserver,
+            REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(), 
TimeUnit.NANOSECONDS);
     }
 
     @Override
     public void onCompleted() {
+        final String clientId = sessionHandler.clientId();
+        LOGGER.info("Receive completion for stream response observer, 
clientId={}, endpoints={}", clientId, endpoints);
         release();
         if (!sessionHandler.isRunning()) {
+            LOGGER.info("Session handler is not running, forgive to renew 
request observer, clientId={}, "
+                + "endpoints={}", clientId, endpoints);
             return;
         }
-        sessionHandler.getScheduler().schedule(this::renewRequestObserver, 3, 
TimeUnit.SECONDS);
+        sessionHandler.getScheduler().schedule(this::renewRequestObserver,
+            REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.toNanos(), 
TimeUnit.NANOSECONDS);
     }
 }
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
new file mode 100644
index 0000000..b77bf3a
--- /dev/null
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientSessionImplTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+
+import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.TelemetryCommand;
+import apache.rocketmq.v2.VerifyMessageCommand;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.awaitility.Durations;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class ClientSessionImplTest extends TestBase {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void syncSettings() throws ClientException, ExecutionException, 
InterruptedException, TimeoutException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        
Mockito.doNothing().when(requestObserver).onNext(any(TelemetryCommand.class));
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        
Mockito.doNothing().when(sessionHandler).onSettingsCommand(any(Endpoints.class),
 any(Settings.class));
+        final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, 
TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>());
+        final Settings settings = Settings.newBuilder().build();
+        TelemetryCommand settingsCommand = 
TelemetryCommand.newBuilder().setSettings(settings).build();
+        executor.submit(() -> clientSession.onNext(settingsCommand));
+        clientSession.syncSettings();
+        Mockito.verify(sessionHandler, 
times(1)).onSettingsCommand(eq(endpoints), eq(settings));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testOnNextWithRecoverOrphanedTransactionCommand() throws 
ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        
Mockito.doNothing().when(sessionHandler).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
+            any(RecoverOrphanedTransactionCommand.class));
+        RecoverOrphanedTransactionCommand command0 = 
RecoverOrphanedTransactionCommand.newBuilder().build();
+        TelemetryCommand command = TelemetryCommand.newBuilder()
+            .setRecoverOrphanedTransactionCommand(command0).build();
+        clientSession.onNext(command);
+        Mockito.verify(sessionHandler, 
times(1)).onRecoverOrphanedTransactionCommand(eq(endpoints), eq(command0));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnNextWithVerifyMessageCommand() throws ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        
Mockito.doNothing().when(sessionHandler).onVerifyMessageCommand(any(Endpoints.class),
+            any(VerifyMessageCommand.class));
+        VerifyMessageCommand command0 = 
VerifyMessageCommand.newBuilder().build();
+        TelemetryCommand command = TelemetryCommand.newBuilder()
+            .setVerifyMessageCommand(command0).build();
+        clientSession.onNext(command);
+        Mockito.verify(sessionHandler, 
times(1)).onVerifyMessageCommand(eq(endpoints), eq(command0));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnNextWithPrintThreadStackTraceCommand() throws 
ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        
Mockito.doNothing().when(sessionHandler).onPrintThreadStackTraceCommand(any(Endpoints.class),
+            any(PrintThreadStackTraceCommand.class));
+        PrintThreadStackTraceCommand command0 = 
PrintThreadStackTraceCommand.newBuilder().build();
+        TelemetryCommand command = TelemetryCommand.newBuilder()
+            .setPrintThreadStackTraceCommand(command0).build();
+        clientSession.onNext(command);
+        Mockito.verify(sessionHandler, 
times(1)).onPrintThreadStackTraceCommand(eq(endpoints), eq(command0));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnNextWithUnrecognizedCommand() throws ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        
Mockito.doNothing().when(sessionHandler).onSettingsCommand(any(Endpoints.class),
 any(Settings.class));
+        
Mockito.doNothing().when(sessionHandler).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
+            any(RecoverOrphanedTransactionCommand.class));
+        
Mockito.doNothing().when(sessionHandler).onVerifyMessageCommand(any(Endpoints.class),
+            any(VerifyMessageCommand.class));
+        
Mockito.doNothing().when(sessionHandler).onPrintThreadStackTraceCommand(any(Endpoints.class),
+            any(PrintThreadStackTraceCommand.class));
+        TelemetryCommand command = TelemetryCommand.newBuilder().build();
+        clientSession.onNext(command);
+        Mockito.verify(sessionHandler, 
never()).onSettingsCommand(any(Endpoints.class), any(Settings.class));
+        Mockito.verify(sessionHandler, 
never()).onRecoverOrphanedTransactionCommand(any(Endpoints.class),
+            any(RecoverOrphanedTransactionCommand.class));
+        Mockito.verify(sessionHandler, 
never()).onVerifyMessageCommand(any(Endpoints.class),
+            any(VerifyMessageCommand.class));
+        Mockito.verify(sessionHandler, 
never()).onPrintThreadStackTraceCommand(any(Endpoints.class),
+            any(PrintThreadStackTraceCommand.class));
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnError() throws ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        Mockito.doNothing().when(requestObserver).onCompleted();
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        Mockito.doReturn(true).when(sessionHandler).isRunning();
+        Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
+        
Mockito.doReturn(false).when(sessionHandler).isEndpointsDeprecated(endpoints);
+        final Exception e = new Exception();
+        clientSession.onError(e);
+        Mockito.verify(sessionHandler, times(1)).isRunning();
+        Mockito.verify(requestObserver, times(1)).onCompleted();
+        Mockito.verify(sessionHandler, times(1)).getScheduler();
+        
await().atMost(ClientSessionImpl.REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.plus(Durations.ONE_SECOND))
+            .untilAsserted(() -> {
+                Mockito.verify(sessionHandler, 
times(1)).isEndpointsDeprecated(eq(endpoints));
+                Mockito.verify(sessionHandler, 
times(2)).telemetry(eq(endpoints), eq(clientSession));
+            });
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnErrorWithEndpointsDeprecated() throws ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        Mockito.doNothing().when(requestObserver).onCompleted();
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        Mockito.doReturn(true).when(sessionHandler).isRunning();
+        Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
+        
Mockito.doReturn(true).when(sessionHandler).isEndpointsDeprecated(endpoints);
+        final Exception e = new Exception();
+        clientSession.onError(e);
+        Mockito.verify(sessionHandler, times(1)).isRunning();
+        Mockito.verify(requestObserver, times(1)).onCompleted();
+        Mockito.verify(sessionHandler, times(1)).getScheduler();
+        
await().atMost(ClientSessionImpl.REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.plus(Durations.ONE_SECOND))
+            .untilAsserted(() -> {
+                Mockito.verify(sessionHandler, 
times(1)).isEndpointsDeprecated(eq(endpoints));
+                Mockito.verify(sessionHandler, 
times(1)).telemetry(eq(endpoints), eq(clientSession));
+            });
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnErrorWithSessionHandlerIsNotRunning() throws 
ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        Mockito.doNothing().when(requestObserver).onCompleted();
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        Mockito.doReturn(false).when(sessionHandler).isRunning();
+        final Exception e = new Exception();
+        clientSession.onError(e);
+        Mockito.verify(sessionHandler, times(1)).isRunning();
+        Mockito.verify(requestObserver, times(1)).onCompleted();
+        Mockito.verify(sessionHandler, never()).getScheduler();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnCompleted() throws ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        Mockito.doNothing().when(requestObserver).onCompleted();
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        Mockito.doReturn(true).when(sessionHandler).isRunning();
+        Mockito.doReturn(SCHEDULER).when(sessionHandler).getScheduler();
+        
Mockito.doReturn(false).when(sessionHandler).isEndpointsDeprecated(endpoints);
+        clientSession.onCompleted();
+        Mockito.verify(sessionHandler, times(1)).isRunning();
+        Mockito.verify(requestObserver, times(1)).onCompleted();
+        Mockito.verify(sessionHandler, times(1)).getScheduler();
+        
await().atMost(ClientSessionImpl.REQUEST_OBSERVER_RENEW_BACKOFF_DELAY.plus(Durations.ONE_SECOND))
+            .untilAsserted(() -> {
+                Mockito.verify(sessionHandler, 
times(1)).isEndpointsDeprecated(eq(endpoints));
+                Mockito.verify(sessionHandler, 
times(2)).telemetry(eq(endpoints), eq(clientSession));
+            });
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testOnCompletedWithSessionHandlerIsNotRunning() throws 
ClientException {
+        final Endpoints endpoints = fakeEndpoints();
+        final ClientSessionHandler sessionHandler = 
Mockito.mock(ClientSessionHandler.class);
+        final StreamObserver<TelemetryCommand> requestObserver = 
Mockito.mock(StreamObserver.class);
+        
Mockito.doReturn(requestObserver).when(sessionHandler).telemetry(any(Endpoints.class),
+            any(StreamObserver.class));
+        Mockito.doNothing().when(requestObserver).onCompleted();
+        final ClientSessionImpl clientSession = new 
ClientSessionImpl(sessionHandler, endpoints);
+        Mockito.doReturn("clientId").when(sessionHandler).clientId();
+        Mockito.doReturn(false).when(sessionHandler).isRunning();
+        clientSession.onCompleted();
+        Mockito.verify(sessionHandler, times(1)).isRunning();
+        Mockito.verify(requestObserver, times(1)).onCompleted();
+        Mockito.verify(sessionHandler, never()).getScheduler();
+    }
+}
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index e0da4a1..3b41f80 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -114,7 +114,8 @@ public class TestBase {
     protected static final long FAKE_OFFSET = 1;
 
     protected static final ScheduledExecutorService SCHEDULER =
-        new ScheduledThreadPoolExecutor(1, new 
ThreadFactoryImpl("TestScheduler"));
+        new 
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), new 
ThreadFactoryImpl(
+            "TestScheduler"));
 
     protected static final ThreadPoolExecutor SINGLE_THREAD_POOL_EXECUTOR =
         new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,

Reply via email to