This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new ca42bb6 Add more unit test for ClientImpl (#191)
ca42bb6 is described below
commit ca42bb67175f48396fe3af96bc60f2c7fa0de34d
Author: Aaron Ai <[email protected]>
AuthorDate: Sat Aug 27 19:31:36 2022 +0800
Add more unit test for ClientImpl (#191)
---
.../rocketmq/client/java/impl/ClientImplTest.java | 147 +++++++++++++++++++++
.../client/java/impl/ClientManagerImplTest.java | 5 +-
2 files changed, 151 insertions(+), 1 deletion(-)
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientImplTest.java
new file mode 100644
index 0000000..324166a
--- /dev/null
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientImplTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import apache.rocketmq.v2.Broker;
+import apache.rocketmq.v2.HeartbeatRequest;
+import apache.rocketmq.v2.MessageQueue;
+import apache.rocketmq.v2.MessageType;
+import apache.rocketmq.v2.NotifyClientTerminationRequest;
+import apache.rocketmq.v2.Permission;
+import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.Resource;
+import apache.rocketmq.v2.TelemetryCommand;
+import apache.rocketmq.v2.VerifyMessageCommand;
+import io.grpc.stub.StreamObserver;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
+import org.apache.rocketmq.client.java.tool.TestBase;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+@SuppressWarnings("unchecked")
+public class ClientImplTest extends TestBase {
+ private final ClientConfiguration clientConfiguration =
ClientConfiguration.newBuilder()
+ .setEndpoints(FAKE_ENDPOINTS).build();
+
+ private ClientImpl createClient() {
+ return Mockito.spy(new ClientImpl(clientConfiguration, new
HashSet<>()) {
+ @Override
+ public Settings getSettings() {
+ return null;
+ }
+
+ @Override
+ public NotifyClientTerminationRequest
wrapNotifyClientTerminationRequest() {
+ return null;
+ }
+
+ @Override
+ public HeartbeatRequest wrapHeartbeatRequest() {
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testTelemetry() throws ClientException {
+ final Endpoints endpoints = fakeEndpoints();
+ TelemetryCommand command = TelemetryCommand.newBuilder().build();
+ final StreamObserver<TelemetryCommand> observer =
+ (StreamObserver<TelemetryCommand>)
Mockito.mock(StreamObserver.class);
+ final ClientImpl client = createClient();
+ doReturn(observer).when(client).telemetry(any(Endpoints.class),
any(StreamObserver.class));
+ doNothing().when(observer).onNext(any(TelemetryCommand.class));
+ client.telemetry(endpoints, command);
+ verify(client, times(1)).telemetry(any(Endpoints.class),
any(StreamObserver.class));
+ verify(observer, times(1)).onNext(eq(command));
+ }
+
+ @Test
+ public void testOnPrintThreadStackTraceCommand() throws ClientException {
+ PrintThreadStackTraceCommand command =
PrintThreadStackTraceCommand.newBuilder().build();
+ final Endpoints endpoints = fakeEndpoints();
+ final StreamObserver<TelemetryCommand> observer =
+ (StreamObserver<TelemetryCommand>)
Mockito.mock(StreamObserver.class);
+ final ClientImpl client = createClient();
+ doReturn(observer).when(client).telemetry(any(Endpoints.class),
any(StreamObserver.class));
+ doNothing().when(observer).onNext(any(TelemetryCommand.class));
+ client.onPrintThreadStackTraceCommand(endpoints, command);
+ await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
+ verify(client, times(1)).telemetry(any(Endpoints.class),
any(StreamObserver.class));
+ verify(observer, times(1)).onNext(any(TelemetryCommand.class));
+ });
+ }
+
+ @Test
+ public void testOnVerifyMessageCommand() throws ClientException {
+ VerifyMessageCommand command =
VerifyMessageCommand.newBuilder().build();
+ final Endpoints endpoints = fakeEndpoints();
+ final StreamObserver<TelemetryCommand> observer =
+ (StreamObserver<TelemetryCommand>)
Mockito.mock(StreamObserver.class);
+ final ClientImpl client = createClient();
+ doReturn(observer).when(client).telemetry(any(Endpoints.class),
any(StreamObserver.class));
+ doNothing().when(observer).onNext(any(TelemetryCommand.class));
+ client.onVerifyMessageCommand(endpoints, command);
+ verify(client, times(1)).telemetry(any(Endpoints.class),
any(StreamObserver.class));
+ verify(observer, times(1)).onNext(any(TelemetryCommand.class));
+ }
+
+ @Test
+ public void testOnTopicRouteDataFetchedFailure() throws ClientException {
+ String topic = FAKE_TOPIC_0;
+ List<MessageQueue> messageQueueList = new ArrayList<>();
+ final apache.rocketmq.v2.Endpoints pbEndpoints = fakePbEndpoints0();
+ MessageQueue mq =
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(topic))
+ .setPermission(Permission.READ_WRITE)
+ .addAcceptMessageTypes(MessageType.NORMAL)
+
.setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(pbEndpoints))
+ .setId(0).build();
+ messageQueueList.add(mq);
+ final TopicRouteData topicRouteData = new
TopicRouteData(messageQueueList);
+ final StreamObserver<TelemetryCommand> observer =
+ (StreamObserver<TelemetryCommand>)
Mockito.mock(StreamObserver.class);
+ final ClientImpl client = createClient();
+ doReturn(observer).when(client).telemetry(any(Endpoints.class),
any(StreamObserver.class));
+ doNothing().when(observer).onNext(any(TelemetryCommand.class));
+ TelemetryCommand settingsCommand =
TelemetryCommand.newBuilder().build();
+ doReturn(settingsCommand).when(client).settingsCommand();
+ try {
+ client.onTopicRouteDataFetched(topic, topicRouteData);
+ fail();
+ } catch (Throwable t) {
+ verify(client, times(1)).telemetry(any(Endpoints.class),
any(StreamObserver.class));
+ verify(observer, times(1)).onNext(any(TelemetryCommand.class));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index cb348ab..46d8233 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -27,6 +27,7 @@ import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.SendMessageRequest;
+import io.grpc.Metadata;
import java.time.Duration;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.tool.TestBase;
@@ -39,8 +40,10 @@ public class ClientManagerImplTest extends TestBase {
private static ClientManagerImpl CLIENT_MANAGER;
@BeforeClass
- public static void setUp() {
+ public static void setUp() throws Exception {
Client client = Mockito.mock(Client.class);
+ final Metadata metadata = new Metadata();
+ Mockito.doReturn(metadata).when(client).sign();
final ClientId clientId = new ClientId();
Mockito.doReturn(clientId).when(client).getClientId();
CLIENT_MANAGER = new ClientManagerImpl(client);