This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit ea053bfbdee5431f7ff7b48adbe9f9e0aa6970c4 Author: yukon <[email protected]> AuthorDate: Thu Jun 6 16:41:28 2019 +0800 Finish the NettyRemotingAbstractTest --- .../api/exception/SemaphoreExhaustedException.java | 44 ++++ .../remoting/impl/netty/NettyRemotingAbstract.java | 14 +- .../org/apache/rocketmq/remoting/BaseTest.java | 46 +++- .../impl/netty/NettyRemotingAbstractTest.java | 261 ++++++++++++++++++++- 4 files changed, 347 insertions(+), 18 deletions(-) diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/SemaphoreExhaustedException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/SemaphoreExhaustedException.java new file mode 100644 index 0000000..a4e6d16 --- /dev/null +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/SemaphoreExhaustedException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.api.exception; + +public class SemaphoreExhaustedException extends RemoteRuntimeException { + private static final long serialVersionUID = 6280428909532427263L; + + /** + * Constructor for SemaphoreExhaustedException with the specified detail message. + * + * @param msg the detail message + */ + public SemaphoreExhaustedException(String msg) { + super(msg); + } + + /** + * Constructor for SemaphoreExhaustedException with the specified detail message + * and nested exception. + * + * @param msg the detail message + * @param cause the root cause (usually from using an underlying + * remoting API such as RMI) + */ + public SemaphoreExhaustedException(String msg, Throwable cause) { + super(msg, cause); + } + +} \ No newline at end of file diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index de4fc81..62d0d0c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -45,6 +45,7 @@ import org.apache.rocketmq.remoting.api.command.TrafficType; import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException; import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; +import org.apache.rocketmq.remoting.api.exception.SemaphoreExhaustedException; import org.apache.rocketmq.remoting.api.interceptor.Interceptor; import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.api.interceptor.RequestContext; @@ -372,12 +373,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); - RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis); - - this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, - RemotingUtil.extractRemoteAddress(channel), request, responseCommand)); - - return responseCommand; + return this.invoke0(remoteAddr, channel, request, timeoutMillis); } private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request, @@ -432,14 +428,14 @@ public abstract class NettyRemotingAbstract implements RemotingService { } public void invokeAsyncWithInterceptor(final Channel channel, final RemotingCommand request, - final AsyncHandler invokeCallback, long timeoutMillis) { + final AsyncHandler asyncHandler, long timeoutMillis) { request.trafficType(TrafficType.REQUEST_ASYNC); final String remoteAddr = RemotingUtil.extractRemoteAddress(channel); this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); - this.invokeAsync0(remoteAddr, channel, request, invokeCallback, timeoutMillis); + this.invokeAsync0(remoteAddr, channel, request, asyncHandler, timeoutMillis); } private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request, @@ -476,7 +472,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } else { String info = String.format("No available async semaphore to issue the request request %s", request.toString()); - requestFail(new ResponseFuture(request.requestID(), timeoutMillis, asyncHandler, null), new RemoteAccessException(info)); + requestFail(new ResponseFuture(request.requestID(), timeoutMillis, asyncHandler, null), new SemaphoreExhaustedException(info)); LOG.error(info); } } diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java index 6a3112b..47fd4bb 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting; +import java.io.PrintStream; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -28,8 +29,26 @@ import org.apache.rocketmq.remoting.api.command.TrafficType; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; import org.assertj.core.api.Fail; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; + +@RunWith(MockitoJUnitRunner.class) public class BaseTest { + @Mock + protected PrintStream fakeOut; + + @Test + public void emptyTest() { + + } + protected void scheduleInThreads(final Runnable runnable, int periodMillis) { final ScheduledExecutorService executor = ThreadUtils.newSingleThreadScheduledExecutor("UnitTests", true); executor.scheduleAtFixedRate(runnable, 0, periodMillis, TimeUnit.MILLISECONDS); @@ -93,8 +112,33 @@ public class BaseTest { return new ObjectFuture<>(permits, timeoutMillis); } + protected ObjectFuture<String> retrieveStringFromLog(final String targetString) { + final ObjectFuture<String> objectFuture = newObjectFuture(1, 3000); + final PrintStream originStream = System.err; + System.setErr(fakeOut); + + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + Object[] arguments = invocation.getArguments(); + + String str = arguments[0].toString(); + + if (str.contains(targetString)) { + System.setErr(originStream); + objectFuture.putObject(str); + objectFuture.release(); + } + + return null; + } + }).when(fakeOut).println(any(String.class)); + + return objectFuture; + } + protected class ObjectFuture<T> { - private T object; + volatile private T object; private Semaphore semaphore; private int permits; private int timeoutMillis; diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java index 8303ae9..c9332b4 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java @@ -25,18 +25,26 @@ import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultEventLoop; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.embedded.EmbeddedChannel; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.BaseTest; import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RemotingEndPoint; import org.apache.rocketmq.remoting.api.RequestProcessor; import org.apache.rocketmq.remoting.api.channel.ChannelEventListener; import org.apache.rocketmq.remoting.api.channel.RemotingChannel; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; -import org.apache.rocketmq.remoting.config.RemotingClientConfig; +import org.apache.rocketmq.remoting.api.exception.SemaphoreExhaustedException; +import org.apache.rocketmq.remoting.api.interceptor.Interceptor; +import org.apache.rocketmq.remoting.api.interceptor.RequestContext; +import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; +import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; +import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode; import org.apache.rocketmq.remoting.impl.netty.handler.Decoder; import org.apache.rocketmq.remoting.impl.netty.handler.Encoder; import org.junit.After; @@ -49,6 +57,7 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -65,11 +74,28 @@ public class NettyRemotingAbstractTest extends BaseTest { private RemotingCommand remotingRequest; + private RemotingCommand remotingRequestAsync; + + private RemotingCommand remotingRequestOneway; + private short requestCode = 123; + private int semaphoreNum = 5; @Before public void setUp() { - remotingAbstract = new NettyRemotingAbstract(new RemotingClientConfig()) { + RemotingConfig remotingConfig = new RemotingConfig() { + @Override + public int getOnewayInvokeSemaphore() { + return semaphoreNum; + } + + @Override + public int getAsyncInvokeSemaphore() { + return semaphoreNum; + } + }; + + remotingAbstract = new NettyRemotingAbstract(remotingConfig) { }; clientChannel = new EmbeddedChannel(new Encoder(), new Decoder(), new SimpleChannelInboundHandler<RemotingCommand>() { @@ -92,6 +118,14 @@ public class NettyRemotingAbstractTest extends BaseTest { remotingRequest.cmdCode(requestCode); remotingRequest.payload("Ping".getBytes()); + remotingRequestAsync = remotingAbstract.commandFactory().createRequest(); + remotingRequestAsync.cmdCode(requestCode); + remotingRequestAsync.payload("Ping".getBytes()); + + remotingRequestOneway = remotingAbstract.commandFactory().createRequest(); + remotingRequestOneway.cmdCode(requestCode); + remotingRequestOneway.payload("Ping".getBytes()); + // Simulate the tcp stack scheduleInThreads(new Runnable() { @Override @@ -289,6 +323,57 @@ public class NettyRemotingAbstractTest extends BaseTest { } @Test + public void invokeAsyncWithInterceptor_SemaphoreExhausted() { + registerTimeoutProcessor(1000); + + final ObjectFuture<Throwable> objectFuture = newObjectFuture(1, 10); + + for (int i = 0; i < semaphoreNum; i++) { + remotingAbstract.invokeAsyncWithInterceptor(clientChannel, remotingRequest, null, 100); + } + + remotingAbstract.invokeAsyncWithInterceptor(clientChannel, remotingRequest, new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + objectFuture.putObject(cause); + objectFuture.release(); + } + + @Override + public void onSuccess(final RemotingCommand response) { + + } + }, 100); + + assertThat(objectFuture.getObject()).isInstanceOf(SemaphoreExhaustedException.class); + } + + @Test + public void invokeAsyncWithInterceptor_AccessException() { + ChannelPromise channelPromise = new DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop()); + + when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise); + channelPromise.setFailure(new UnitTestException()); + + final ObjectFuture<Throwable> objectFuture = newObjectFuture(1, 10); + + remotingAbstract.invokeAsyncWithInterceptor(mockedClientChannel, remotingRequest, new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + objectFuture.putObject(cause); + objectFuture.release(); + } + + @Override + public void onSuccess(final RemotingCommand response) { + + } + }, 10); + + assertThat(objectFuture.getObject().getCause()).isInstanceOf(UnitTestException.class); + } + + @Test public void invokeOnewayWithInterceptor_Success() { ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 10); registerOnewayProcessor(objectFuture); @@ -300,27 +385,187 @@ public class NettyRemotingAbstractTest extends BaseTest { } @Test - public void registerInterceptor() { - } + public void invokeOnewayWithInterceptor_AccessException() { + ChannelPromise channelPromise = new DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop()); - @Test - public void registerRequestProcessor() { + when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise); + channelPromise.setFailure(new UnitTestException()); + + String expectedLog = "Send request command to channel null failed !"; + + ObjectFuture<String> objectFuture = retrieveStringFromLog(expectedLog); + + remotingAbstract.invokeOnewayWithInterceptor(mockedClientChannel, remotingRequest); + + assertThat(objectFuture.getObject()).contains(expectedLog); } @Test - public void registerRequestProcessor1() { + public void invokeOnewayWithInterceptor_SemaphoreExhausted() { + final ChannelPromise channelPromise = new DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop()); + + when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise); + + for (int i = 0; i < semaphoreNum; i++) { + remotingAbstract.invokeOnewayWithInterceptor(mockedClientChannel, remotingRequest); + } + + runInThreads(new Runnable() { + @Override + public void run() { + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ignore) { + } + channelPromise.setSuccess(); + } + }, 1); + String expectedLog = "No available oneway semaphore to issue the request"; + ObjectFuture<String> objectFuture = retrieveStringFromLog(expectedLog); + + remotingAbstract.invokeOnewayWithInterceptor(mockedClientChannel, remotingRequest); + assertThat(objectFuture.getObject()).contains(expectedLog); } @Test public void unregisterRequestProcessor() { + registerNormalProcessor(); + + RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000); + assertThat(new String(response.payload())).isEqualTo("Pong"); + + remotingAbstract.unregisterRequestProcessor(requestCode); + + response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000); + assertThat(response.opCode()).isEqualTo(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED); } @Test public void processor() { + registerNormalProcessor(); + RemotingCommand response = remotingAbstract.processor(requestCode).getLeft().processRequest(mock(RemotingChannel.class), remotingRequest); + assertThat(new String(response.payload())).isEqualTo("Pong"); + + assertThat(remotingAbstract.processor((short) (requestCode + 1))).isNull(); + } + + @Test + public void registerRequestProcessor_SpecificExecutor() { + ExecutorService executor = ThreadUtils.newSingleThreadExecutor("CustomThread", true); + + remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() { + @Override + public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) { + RemotingCommand response = remotingAbstract.commandFactory().createResponse(request); + response.payload(Thread.currentThread().getName().getBytes()); + return response; + } + }, executor); + + RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000); + assertThat(new String(response.payload())).startsWith("CustomThread"); + } + + @Test + public void registerChannelEventListener_ExceptionThrown() { + registerNormalProcessor(); + + String expectedLog = "Exception thrown when dispatching channel event"; + + ObjectFuture<String> objectFuture = retrieveStringFromLog(expectedLog); + + remotingAbstract.registerChannelEventListener(new ChannelEventListener() { + @Override + public void onChannelConnect(final RemotingChannel channel) { + throw new UnitTestException(); + } + + @Override + public void onChannelClose(final RemotingChannel channel) { + + } + + @Override + public void onChannelException(final RemotingChannel channel, final Throwable cause) { + + } + + @Override + public void onChannelIdle(final RemotingChannel channel) { + + } + }); + + remotingAbstract.channelEventExecutor.start(); + remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, clientChannel)); + + assertThat(objectFuture.getObject()).contains(expectedLog); } @Test - public void registerChannelEventListener() { + public void registerInterceptor_NoModification() { + registerNormalProcessor(); + final ObjectFuture<RemotingCommand> localRequest = newObjectFuture(1, 100); + final ObjectFuture<RemotingCommand> localResponse = newObjectFuture(1, 100); + + final ObjectFuture<RemotingCommand> remoteRequest = newObjectFuture(1, 100); + final ObjectFuture<RemotingCommand> remoteResponse = newObjectFuture(1, 100); + + remotingAbstract.registerInterceptor(new Interceptor() { + @Override + public void beforeRequest(final RequestContext context) { + if (context.getRemotingEndPoint() == RemotingEndPoint.REQUEST) { + localRequest.putObject(context.getRequest()); + localRequest.release(); + } + + if (context.getRemotingEndPoint() == RemotingEndPoint.RESPONSE) { + remoteRequest.putObject(context.getRequest()); + remoteRequest.release(); + } + } + + @Override + public void afterResponseReceived(final ResponseContext context) { + if (context.getRemotingEndPoint() == RemotingEndPoint.REQUEST) { + localResponse.putObject(context.getResponse()); + localResponse.release(); + } + + if (context.getRemotingEndPoint() == RemotingEndPoint.RESPONSE) { + remoteResponse.putObject(context.getResponse()); + remoteResponse.release(); + } + } + }); + + RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000); + + assertThat(new String(response.payload())).startsWith("Pong"); + + assertThat(localRequest.getObject()).isEqualTo(remotingRequest); + assertThat(localResponse.getObject()).isEqualTo(response); + + assertThat(remoteRequest.getObject()).isEqualTo(remotingRequest); + assertThat(remoteResponse.getObject()).isEqualTo(response); + + remotingAbstract.invokeAsyncWithInterceptor(clientChannel, remotingRequestAsync, null,3000); + + assertThat(localRequest.getObject()).isEqualTo(remotingRequestAsync); + + + assertThat(localResponse.getObject().requestID()).isEqualTo(remotingRequestAsync.requestID()); + + assertThat(remoteRequest.getObject()).isEqualTo(remotingRequestAsync); + assertThat(remoteResponse.getObject().requestID()).isEqualTo(remotingRequestAsync.requestID()); + + remotingAbstract.invokeOnewayWithInterceptor(clientChannel, remotingRequestOneway); + + assertThat(localRequest.getObject()).isEqualTo(remotingRequestOneway); + + assertThat(remoteRequest.getObject()).isEqualTo(remotingRequestOneway); + assertThat(remoteResponse.getObject().requestID()).isEqualTo(remotingRequestOneway.requestID()); + } private void registerTimeoutProcessor(final int timeoutMillis) {
