This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 1c342ef02d0ddd7742456c491d09ddd0d29982db Author: yukon <[email protected]> AuthorDate: Thu Jun 6 21:05:32 2019 +0800 Add unit tests for remote connection --- .../rocketmq/remoting/config/RemotingConfig.java | 19 ++ .../remoting/impl/netty/NettyRemotingClient.java | 11 +- .../remoting/impl/netty/NettyRemotingServer.java | 10 +- .../org/apache/rocketmq/remoting/BaseTest.java | 18 +- .../impl/netty/EpollRemoteConnectionTest.java | 237 +++++++++++++++++++++ .../impl/netty/NettyRemoteConnectionTest.java | 231 ++++++++++++++++++++ .../impl/netty/NettyRemotingClientTest.java | 144 +++++++++++++ 7 files changed, 664 insertions(+), 6 deletions(-) diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java index d6f636b..a64c29e 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java @@ -34,6 +34,9 @@ public abstract class RemotingConfig extends TcpSocketConfig { private int publicExecutorThreads = 4; + private int remotingShutdownQuietPeriodMillis = 2000; + private int remotingShutdownTimeoutMillis = 15000; + public abstract int getOnewayInvokeSemaphore(); public abstract int getAsyncInvokeSemaphore(); @@ -93,4 +96,20 @@ public abstract class RemotingConfig extends TcpSocketConfig { public void setPublicExecutorThreads(final int publicExecutorThreads) { this.publicExecutorThreads = publicExecutorThreads; } + + public int getRemotingShutdownQuietPeriodMillis() { + return remotingShutdownQuietPeriodMillis; + } + + public void setRemotingShutdownQuietPeriodMillis(final int remotingShutdownQuietPeriodMillis) { + this.remotingShutdownQuietPeriodMillis = remotingShutdownQuietPeriodMillis; + } + + public int getRemotingShutdownTimeoutMillis() { + return remotingShutdownTimeoutMillis; + } + + public void setRemotingShutdownTimeoutMillis(final int remotingShutdownTimeoutMillis) { + this.remotingShutdownTimeoutMillis = remotingShutdownTimeoutMillis; + } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index c146813..9f0fd1c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -37,6 +37,7 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.command.RemotingCommand; @@ -105,9 +106,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti try { clientChannelManager.clear(); - this.ioGroup.shutdownGracefully(); + this.ioGroup.shutdownGracefully(clientConfig.getRemotingShutdownQuietPeriodMillis(), + clientConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync(); - this.workerGroup.shutdownGracefully(); + this.workerGroup.shutdownGracefully(clientConfig.getRemotingShutdownQuietPeriodMillis(), + clientConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync(); } catch (Exception e) { LOG.warn("RemotingClient stopped error !", e); } @@ -183,6 +186,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } + public void setClientChannelManager(final ClientChannelManager clientChannelManager) { + this.clientChannelManager = clientChannelManager; + } + private class ClientConnectionHandler extends ChannelDuplexHandler { @Override diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index 0967208..e83cd1d 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -39,6 +39,7 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.channel.RemotingChannel; @@ -119,11 +120,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void stop() { try { - this.bossGroup.shutdownGracefully().syncUninterruptibly(); + this.bossGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(), + serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync(); - this.ioGroup.shutdownGracefully().syncUninterruptibly(); + this.ioGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(), + serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync(); - this.workerGroup.shutdownGracefully().syncUninterruptibly(); + this.workerGroup.shutdownGracefully(serverConfig.getRemotingShutdownQuietPeriodMillis(), + serverConfig.getRemotingShutdownTimeoutMillis(), TimeUnit.MILLISECONDS).sync(); } catch (Exception e) { LOG.warn("RemotingServer stopped error !", e); } diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java index 47fd4bb..14e8c21 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.RandomStringUtils; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.TrafficType; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; +import org.apache.rocketmq.remoting.config.RemotingServerConfig; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; import org.assertj.core.api.Fail; @@ -82,7 +84,7 @@ public class BaseTest { } protected void shouldNotReachHere() { - throw new RuntimeException("shouldn't reach here"); + Fail.fail("Shouldn't reach here"); } protected RemotingCommand randomRemotingCommand() { @@ -137,6 +139,20 @@ public class BaseTest { return objectFuture; } + protected RemotingClientConfig clientConfig() { + RemotingClientConfig clientConfig = new RemotingClientConfig(); + clientConfig.setRemotingShutdownQuietPeriodMillis(0); + clientConfig.setRemotingShutdownTimeoutMillis(10); + return clientConfig; + } + + protected RemotingServerConfig serverConfig() { + RemotingServerConfig serverConfig = new RemotingServerConfig(); + serverConfig.setRemotingShutdownQuietPeriodMillis(0); + serverConfig.setRemotingShutdownTimeoutMillis(10); + return serverConfig; + } + protected class ObjectFuture<T> { volatile private T object; private Semaphore semaphore; diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java new file mode 100644 index 0000000..cdf1d18 --- /dev/null +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/EpollRemoteConnectionTest.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import org.apache.rocketmq.remoting.BaseTest; +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RemotingClient; +import org.apache.rocketmq.remoting.api.RemotingServer; +import org.apache.rocketmq.remoting.api.RequestProcessor; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.interceptor.Interceptor; +import org.apache.rocketmq.remoting.api.interceptor.RequestContext; +import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; +import org.apache.rocketmq.remoting.config.RemotingServerConfig; +import org.apache.rocketmq.remoting.internal.JvmUtils; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class EpollRemoteConnectionTest extends BaseTest { + private static RemotingServer remotingServer; + private static RemotingClient remotingClient; + + private static RemotingServer remotingEpollServer; + private static RemotingClient remotingEpollClient; + + private static short requestCode = 123; + private RemotingCommand request; + + private static String remoteAddr; + private static String remoteEpollAddr; + + + @Before + public void enableOnLinux() throws Exception { + Assume.assumeTrue(JvmUtils.isLinux()); + } + + @BeforeClass + public static void setUp() throws Exception { + RemotingClientConfig clientConfig = new RemotingClientConfig(); + clientConfig.setRemotingShutdownQuietPeriodMillis(0); + clientConfig.setRemotingShutdownTimeoutMillis(10); + + RemotingServerConfig serverConfig = new RemotingServerConfig(); + serverConfig.setRemotingShutdownQuietPeriodMillis(0); + serverConfig.setRemotingShutdownTimeoutMillis(10); + + RemotingClientConfig epollClientConfig = new RemotingClientConfig(); + epollClientConfig.setClientNativeEpollEnable(true); + epollClientConfig.setRemotingShutdownQuietPeriodMillis(0); + epollClientConfig.setRemotingShutdownTimeoutMillis(10); + + RemotingServerConfig epollServerConfig = new RemotingServerConfig(); + epollServerConfig.setServerNativeEpollEnable(true); + epollServerConfig.setServerListenPort(9999); + epollServerConfig.setRemotingShutdownQuietPeriodMillis(0); + epollServerConfig.setRemotingShutdownTimeoutMillis(10); + + remotingClient = new NettyRemotingClient(clientConfig); + remotingServer = new NettyRemotingServer(serverConfig); + + remotingEpollServer = new NettyRemotingServer(epollServerConfig); + remotingEpollClient = new NettyRemotingClient(epollClientConfig); + + remotingServer.registerRequestProcessor(requestCode, new RequestProcessor() { + @Override + public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) { + RemotingCommand response = remotingServer.commandFactory().createResponse(request); + response.payload("Pong".getBytes()); + return response; + } + }); + + remotingEpollServer.registerRequestProcessor(requestCode, new RequestProcessor() { + @Override + public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) { + RemotingCommand response = remotingServer.commandFactory().createResponse(request); + response.payload("Pong".getBytes()); + return response; + } + }); + + remotingServer.start(); + remotingClient.start(); + remotingEpollClient.start(); + remotingEpollServer.start(); + + remoteAddr = "127.0.0.1:" + remotingServer.localListenPort(); + remoteEpollAddr = "127.0.0.1:" + remotingEpollServer.localListenPort(); + } + + @AfterClass + public static void tearDown() throws Exception { + remotingClient.stop(); + remotingServer.stop(); + remotingEpollServer.stop(); + remotingEpollClient.stop(); + } + + public RemotingCommand requestCommand() { + request = remotingClient.commandFactory().createRequest(); + request.cmdCode(requestCode); + return request; + } + + @Test + public void invokeToServer_Success() { + // Client to epoll server + RemotingCommand rsp = remotingClient.invoke(remoteEpollAddr, requestCommand(), 3000); + assertThat(new String(rsp.payload())).isEqualTo("Pong"); + + // Epoll client to server + rsp = remotingEpollClient.invoke(remoteAddr, requestCommand(), 3000); + assertThat(new String(rsp.payload())).isEqualTo("Pong"); + + // Epoll client to epoll server + rsp = remotingEpollClient.invoke(remoteEpollAddr, requestCommand(), 3000); + assertThat(new String(rsp.payload())).isEqualTo("Pong"); + } + + @Test + public void invokeAsyncToServer_Success() { + // Client to epoll server + + final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000); + + remotingClient.invokeAsync(remoteEpollAddr, requestCommand(), new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + + } + + @Override + public void onSuccess(final RemotingCommand response) { + objectFuture.putObject(response); + objectFuture.release(); + } + }, 3000); + + assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong"); + + // Epoll client to server + remotingEpollClient.invokeAsync(remoteAddr, requestCommand(), new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + + } + + @Override + public void onSuccess(final RemotingCommand response) { + objectFuture.putObject(response); + objectFuture.release(); + } + }, 3000); + + assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong"); + + // Epoll client to epoll server + remotingEpollClient.invokeAsync(remoteEpollAddr, requestCommand(), new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + + } + + @Override + public void onSuccess(final RemotingCommand response) { + objectFuture.putObject(response); + objectFuture.release(); + } + }, 3000); + + assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong"); + } + + @Test + public void invokeOnewayToServer_Success() { + final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000); + final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000); + + Interceptor interceptor = new Interceptor() { + @Override + public void beforeRequest(final RequestContext context) { + requestFuture.putObject(context.getRequest()); + requestFuture.release(); + } + + @Override + public void afterResponseReceived(final ResponseContext context) { + responseFuture.putObject(context.getResponse()); + responseFuture.release(); + } + }; + + remotingServer.registerInterceptor(interceptor); + remotingEpollServer.registerInterceptor(interceptor); + + // Client to epoll server + remotingClient.invokeOneWay(remoteEpollAddr, request); + + assertThat(requestFuture.getObject()).isEqualTo(request); + assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong"); + + // Epoll client to server + remotingEpollClient.invokeOneWay(remoteAddr, request); + + assertThat(requestFuture.getObject()).isEqualTo(request); + assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong"); + + // Epoll client to epoll server + remotingEpollClient.invokeOneWay(remoteEpollAddr, request); + + assertThat(requestFuture.getObject()).isEqualTo(request); + assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong"); + } +} diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java new file mode 100644 index 0000000..d12a5a3 --- /dev/null +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemoteConnectionTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import org.apache.rocketmq.remoting.BaseTest; +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RemotingClient; +import org.apache.rocketmq.remoting.api.RemotingServer; +import org.apache.rocketmq.remoting.api.RequestProcessor; +import org.apache.rocketmq.remoting.api.channel.ChannelEventListener; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.interceptor.Interceptor; +import org.apache.rocketmq.remoting.api.interceptor.RequestContext; +import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; +import org.apache.rocketmq.remoting.config.RemotingServerConfig; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class NettyRemoteConnectionTest extends BaseTest { + private static RemotingServer remotingServer; + private static RemotingClient remotingClient; + + private static short requestCode = 123; + private RemotingCommand request; + + private static String remoteAddr; + private static RemotingChannel channelInServer; + + @BeforeClass + public static void setUp() throws Exception { + RemotingClientConfig clientConfig = new RemotingClientConfig(); + clientConfig.setRemotingShutdownQuietPeriodMillis(0); + clientConfig.setRemotingShutdownTimeoutMillis(10); + + RemotingServerConfig serverConfig = new RemotingServerConfig(); + serverConfig.setRemotingShutdownQuietPeriodMillis(0); + serverConfig.setRemotingShutdownTimeoutMillis(10); + + remotingClient = new NettyRemotingClient(clientConfig); + remotingServer = new NettyRemotingServer(serverConfig); + + remotingServer.registerRequestProcessor(requestCode, new RequestProcessor() { + @Override + public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) { + RemotingCommand response = remotingServer.commandFactory().createResponse(request); + response.payload("Pong".getBytes()); + return response; + } + }); + + remotingClient.registerRequestProcessor(requestCode, new RequestProcessor() { + @Override + public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) { + RemotingCommand response = remotingServer.commandFactory().createResponse(request); + response.payload("ClientPong".getBytes()); + return response; + } + }); + + remotingServer.registerChannelEventListener(new ChannelEventListener() { + @Override + public void onChannelConnect(final RemotingChannel channel) { + channelInServer = channel; + } + + @Override + public void onChannelClose(final RemotingChannel channel) { + channelInServer = null; + } + + @Override + public void onChannelException(final RemotingChannel channel, final Throwable cause) { + channelInServer = null; + } + + @Override + public void onChannelIdle(final RemotingChannel channel) { + channelInServer = null; + } + }); + + remotingServer.start(); + remotingClient.start(); + + remoteAddr = "127.0.0.1:" + remotingServer.localListenPort(); + } + + @AfterClass + public static void tearDown() throws Exception { + remotingClient.stop(); + remotingServer.stop(); + } + + @Before + public void setUp0() throws Exception { + request = remotingClient.commandFactory().createRequest(); + request.cmdCode(requestCode); + + if (channelInServer == null) { + RemotingCommand rsp = remotingClient.invoke(remoteAddr, request, 3000); + assertThat(new String(rsp.payload())).isEqualTo("Pong"); + + // Refresh the command + request = remotingClient.commandFactory().createRequest(); + request.cmdCode(requestCode); + } + } + + @Test + public void invokeToServer_Success() { + RemotingCommand rsp = remotingClient.invoke(remoteAddr, request, 3000); + assertThat(new String(rsp.payload())).isEqualTo("Pong"); + } + + @Test + public void invokeAsyncToServer_Success() { + final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000); + + remotingClient.invokeAsync(remoteAddr, request, new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + + } + + @Override + public void onSuccess(final RemotingCommand response) { + objectFuture.putObject(response); + objectFuture.release(); + } + }, 3000); + + assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong"); + } + + @Test + public void invokeOnewayToServer_Success() { + final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000); + final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000); + + remotingServer.registerInterceptor(new Interceptor() { + @Override + public void beforeRequest(final RequestContext context) { + requestFuture.putObject(context.getRequest()); + requestFuture.release(); + } + + @Override + public void afterResponseReceived(final ResponseContext context) { + responseFuture.putObject(context.getResponse()); + responseFuture.release(); + } + }); + + remotingClient.invokeOneWay(remoteAddr, request); + + assertThat(requestFuture.getObject()).isEqualTo(request); + assertThat(new String(responseFuture.getObject().payload())).isEqualTo("Pong"); + } + + @Test + public void invokeToClient_Success() { + RemotingCommand rsp = remotingServer.invoke(channelInServer, request, 3000); + assertThat(new String(rsp.payload())).isEqualTo("ClientPong"); + } + + @Test + public void invokeAsyncToClient_Success() { + final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 1000); + + remotingServer.invokeAsync(channelInServer, request, new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + + } + + @Override + public void onSuccess(final RemotingCommand response) { + objectFuture.putObject(response); + objectFuture.release(); + } + }, 3000); + + assertThat(new String(objectFuture.getObject().payload())).isEqualTo("ClientPong"); + } + + @Test + public void invokeOnewayToClient_Success() { + final ObjectFuture<RemotingCommand> requestFuture = newObjectFuture(1, 1000); + final ObjectFuture<RemotingCommand> responseFuture = newObjectFuture(1, 1000); + + remotingClient.registerInterceptor(new Interceptor() { + @Override + public void beforeRequest(final RequestContext context) { + requestFuture.putObject(context.getRequest()); + requestFuture.release(); + } + + @Override + public void afterResponseReceived(final ResponseContext context) { + responseFuture.putObject(context.getResponse()); + responseFuture.release(); + } + }); + + remotingServer.invokeOneWay(channelInServer, request); + + assertThat(requestFuture.getObject()).isEqualTo(request); + assertThat(new String(responseFuture.getObject().payload())).isEqualTo("ClientPong"); + } +} diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java new file mode 100644 index 0000000..aee21fb --- /dev/null +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClientTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import io.netty.channel.Channel; +import org.apache.rocketmq.remoting.BaseTest; +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.exception.RemoteConnectFailureException; +import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class NettyRemotingClientTest extends BaseTest { + @Spy + private NettyRemotingClient remotingClient = new NettyRemotingClient(clientConfig()); + + @Mock + private ClientChannelManager channelManager; + + @Mock + private Channel mockedChannel; + + @Before + public void setUp() { + remotingClient.start(); + remotingClient.setClientChannelManager(channelManager); + when(channelManager.createIfAbsent(any(String.class))).thenReturn(mockedChannel); + when(mockedChannel.isActive()).thenReturn(true); + } + + @After + public void tearDown() { + remotingClient.stop(); + } + + @Test + public void invoke_Success() { + RemotingCommand request = remotingClient.commandFactory().createRequest(); + final RemotingCommand response = remotingClient.commandFactory().createResponse(request); + + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + return response; + } + }).when(remotingClient).invokeWithInterceptor(mockedChannel, request, 3000); + + RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000); + + assertThat(response).isEqualTo(returnedResp); + } + + @Test + public void invoke_ConnectFailureException() { + RemotingCommand request = remotingClient.commandFactory().createRequest(); + when(mockedChannel.isActive()).thenReturn(false); + + try { + RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000); + failBecauseExceptionWasNotThrown(RemoteConnectFailureException.class); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteConnectFailureException.class); + } + } + + @Test + public void invoke_TimeoutException() { + RemotingCommand request = remotingClient.commandFactory().createRequest(); + + doThrow(new RemoteTimeoutException("Timeout exception occurred")).when(remotingClient).invokeWithInterceptor(mockedChannel, request, 3000); + + try { + RemotingCommand returnedResp = remotingClient.invoke("127.0.0.1:10911", request, 3000); + failBecauseExceptionWasNotThrown(RemoteTimeoutException.class); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteTimeoutException.class); + } + } + + @Test + public void invokeAsync_Success() { + RemotingCommand request = remotingClient.commandFactory().createRequest(); + final RemotingCommand response = remotingClient.commandFactory().createResponse(request); + + final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 100); + + final AsyncHandler asyncHandler = new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + + } + + @Override + public void onSuccess(final RemotingCommand response) { + objectFuture.putObject(response); + objectFuture.release(); + } + }; + + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + asyncHandler.onSuccess(response); + return null; + } + }).when(remotingClient).invokeAsyncWithInterceptor(mockedChannel, request, asyncHandler,3000); + + remotingClient.invokeAsync("127.0.0.1:10911", request, asyncHandler, 3000); + + assertThat(objectFuture.getObject()).isEqualTo(response); + + } +} \ No newline at end of file
