This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git

commit ea053bfbdee5431f7ff7b48adbe9f9e0aa6970c4
Author: yukon <[email protected]>
AuthorDate: Thu Jun 6 16:41:28 2019 +0800

    Finish the NettyRemotingAbstractTest
---
 .../api/exception/SemaphoreExhaustedException.java |  44 ++++
 .../remoting/impl/netty/NettyRemotingAbstract.java |  14 +-
 .../org/apache/rocketmq/remoting/BaseTest.java     |  46 +++-
 .../impl/netty/NettyRemotingAbstractTest.java      | 261 ++++++++++++++++++++-
 4 files changed, 347 insertions(+), 18 deletions(-)

diff --git 
a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/SemaphoreExhaustedException.java
 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/SemaphoreExhaustedException.java
new file mode 100644
index 0000000..a4e6d16
--- /dev/null
+++ 
b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/SemaphoreExhaustedException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.api.exception;
+
+public class SemaphoreExhaustedException extends RemoteRuntimeException {
+    private static final long serialVersionUID = 6280428909532427263L;
+
+    /**
+     * Constructor for SemaphoreExhaustedException with the specified detail 
message.
+     *
+     * @param msg the detail message
+     */
+    public SemaphoreExhaustedException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Constructor for SemaphoreExhaustedException with the specified detail 
message
+     * and nested exception.
+     *
+     * @param msg the detail message
+     * @param cause the root cause (usually from using an underlying
+     * remoting API such as RMI)
+     */
+    public SemaphoreExhaustedException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+
+}
\ No newline at end of file
diff --git 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
index de4fc81..62d0d0c 100644
--- 
a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
+++ 
b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java
@@ -45,6 +45,7 @@ import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
 import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException;
 import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
+import org.apache.rocketmq.remoting.api.exception.SemaphoreExhaustedException;
 import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
 import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup;
 import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
@@ -372,12 +373,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
 
         this.interceptorGroup.beforeRequest(new 
RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
 
-        RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, 
request, timeoutMillis);
-
-        this.interceptorGroup.afterResponseReceived(new 
ResponseContext(RemotingEndPoint.REQUEST,
-            RemotingUtil.extractRemoteAddress(channel), request, 
responseCommand));
-
-        return responseCommand;
+        return this.invoke0(remoteAddr, channel, request, timeoutMillis);
     }
 
     private RemotingCommand invoke0(final String remoteAddr, final Channel 
channel, final RemotingCommand request,
@@ -432,14 +428,14 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
     }
 
     public void invokeAsyncWithInterceptor(final Channel channel, final 
RemotingCommand request,
-        final AsyncHandler invokeCallback, long timeoutMillis) {
+        final AsyncHandler asyncHandler, long timeoutMillis) {
         request.trafficType(TrafficType.REQUEST_ASYNC);
 
         final String remoteAddr = RemotingUtil.extractRemoteAddress(channel);
 
         this.interceptorGroup.beforeRequest(new 
RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request));
 
-        this.invokeAsync0(remoteAddr, channel, request, invokeCallback, 
timeoutMillis);
+        this.invokeAsync0(remoteAddr, channel, request, asyncHandler, 
timeoutMillis);
     }
 
     private void invokeAsync0(final String remoteAddr, final Channel channel, 
final RemotingCommand request,
@@ -476,7 +472,7 @@ public abstract class NettyRemotingAbstract implements 
RemotingService {
             }
         } else {
             String info = String.format("No available async semaphore to issue 
the request request %s", request.toString());
-            requestFail(new ResponseFuture(request.requestID(), timeoutMillis, 
asyncHandler, null), new RemoteAccessException(info));
+            requestFail(new ResponseFuture(request.requestID(), timeoutMillis, 
asyncHandler, null), new SemaphoreExhaustedException(info));
             LOG.error(info);
         }
     }
diff --git 
a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index 6a3112b..47fd4bb 100644
--- 
a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.remoting;
 
+import java.io.PrintStream;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -28,8 +29,26 @@ import org.apache.rocketmq.remoting.api.command.TrafficType;
 import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
 import org.assertj.core.api.Fail;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+
+@RunWith(MockitoJUnitRunner.class)
 public class BaseTest {
+    @Mock
+    protected PrintStream fakeOut;
+
+    @Test
+    public void emptyTest() {
+
+    }
+
     protected void scheduleInThreads(final Runnable runnable, int 
periodMillis) {
         final ScheduledExecutorService executor = 
ThreadUtils.newSingleThreadScheduledExecutor("UnitTests", true);
         executor.scheduleAtFixedRate(runnable, 0, periodMillis, 
TimeUnit.MILLISECONDS);
@@ -93,8 +112,33 @@ public class BaseTest {
         return new ObjectFuture<>(permits, timeoutMillis);
     }
 
+    protected ObjectFuture<String> retrieveStringFromLog(final String 
targetString) {
+        final ObjectFuture<String> objectFuture = newObjectFuture(1, 3000);
+        final PrintStream originStream = System.err;
+        System.setErr(fakeOut);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable {
+                Object[] arguments = invocation.getArguments();
+
+                String str = arguments[0].toString();
+
+                if (str.contains(targetString)) {
+                    System.setErr(originStream);
+                    objectFuture.putObject(str);
+                    objectFuture.release();
+                }
+
+                return null;
+            }
+        }).when(fakeOut).println(any(String.class));
+
+        return objectFuture;
+    }
+
     protected class ObjectFuture<T> {
-        private T object;
+        volatile private T object;
         private Semaphore semaphore;
         private int permits;
         private int timeoutMillis;
diff --git 
a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java
 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java
index 8303ae9..c9332b4 100644
--- 
a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java
+++ 
b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java
@@ -25,18 +25,26 @@ import io.netty.channel.DefaultChannelPromise;
 import io.netty.channel.DefaultEventLoop;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.remoting.BaseTest;
 import org.apache.rocketmq.remoting.api.AsyncHandler;
+import org.apache.rocketmq.remoting.api.RemotingEndPoint;
 import org.apache.rocketmq.remoting.api.RequestProcessor;
 import org.apache.rocketmq.remoting.api.channel.ChannelEventListener;
 import org.apache.rocketmq.remoting.api.channel.RemotingChannel;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
 import org.apache.rocketmq.remoting.api.exception.RemoteAccessException;
 import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException;
-import org.apache.rocketmq.remoting.config.RemotingClientConfig;
+import org.apache.rocketmq.remoting.api.exception.SemaphoreExhaustedException;
+import org.apache.rocketmq.remoting.api.interceptor.Interceptor;
+import org.apache.rocketmq.remoting.api.interceptor.RequestContext;
+import org.apache.rocketmq.remoting.api.interceptor.ResponseContext;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.apache.rocketmq.remoting.external.ThreadUtils;
 import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl;
+import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode;
 import org.apache.rocketmq.remoting.impl.netty.handler.Decoder;
 import org.apache.rocketmq.remoting.impl.netty.handler.Encoder;
 import org.junit.After;
@@ -49,6 +57,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -65,11 +74,28 @@ public class NettyRemotingAbstractTest extends BaseTest {
 
     private RemotingCommand remotingRequest;
 
+    private RemotingCommand remotingRequestAsync;
+
+    private RemotingCommand remotingRequestOneway;
+
     private short requestCode = 123;
+    private int semaphoreNum = 5;
 
     @Before
     public void setUp() {
-        remotingAbstract = new NettyRemotingAbstract(new 
RemotingClientConfig()) {
+        RemotingConfig remotingConfig = new RemotingConfig() {
+            @Override
+            public int getOnewayInvokeSemaphore() {
+                return semaphoreNum;
+            }
+
+            @Override
+            public int getAsyncInvokeSemaphore() {
+                return semaphoreNum;
+            }
+        };
+
+        remotingAbstract = new NettyRemotingAbstract(remotingConfig) {
         };
 
         clientChannel = new EmbeddedChannel(new Encoder(), new Decoder(), new 
SimpleChannelInboundHandler<RemotingCommand>() {
@@ -92,6 +118,14 @@ public class NettyRemotingAbstractTest extends BaseTest {
         remotingRequest.cmdCode(requestCode);
         remotingRequest.payload("Ping".getBytes());
 
+        remotingRequestAsync = 
remotingAbstract.commandFactory().createRequest();
+        remotingRequestAsync.cmdCode(requestCode);
+        remotingRequestAsync.payload("Ping".getBytes());
+
+        remotingRequestOneway = 
remotingAbstract.commandFactory().createRequest();
+        remotingRequestOneway.cmdCode(requestCode);
+        remotingRequestOneway.payload("Ping".getBytes());
+
         // Simulate the tcp stack
         scheduleInThreads(new Runnable() {
             @Override
@@ -289,6 +323,57 @@ public class NettyRemotingAbstractTest extends BaseTest {
     }
 
     @Test
+    public void invokeAsyncWithInterceptor_SemaphoreExhausted() {
+        registerTimeoutProcessor(1000);
+
+        final ObjectFuture<Throwable> objectFuture = newObjectFuture(1, 10);
+
+        for (int i = 0; i < semaphoreNum; i++) {
+            remotingAbstract.invokeAsyncWithInterceptor(clientChannel, 
remotingRequest, null, 100);
+        }
+
+        remotingAbstract.invokeAsyncWithInterceptor(clientChannel, 
remotingRequest, new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final 
Throwable cause) {
+                objectFuture.putObject(cause);
+                objectFuture.release();
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+
+            }
+        }, 100);
+
+        
assertThat(objectFuture.getObject()).isInstanceOf(SemaphoreExhaustedException.class);
+    }
+
+    @Test
+    public void invokeAsyncWithInterceptor_AccessException() {
+        ChannelPromise channelPromise = new 
DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop());
+
+        
when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise);
+        channelPromise.setFailure(new UnitTestException());
+
+        final ObjectFuture<Throwable> objectFuture = newObjectFuture(1, 10);
+
+        remotingAbstract.invokeAsyncWithInterceptor(mockedClientChannel, 
remotingRequest, new AsyncHandler() {
+            @Override
+            public void onFailure(final RemotingCommand request, final 
Throwable cause) {
+                objectFuture.putObject(cause);
+                objectFuture.release();
+            }
+
+            @Override
+            public void onSuccess(final RemotingCommand response) {
+
+            }
+        }, 10);
+
+        
assertThat(objectFuture.getObject().getCause()).isInstanceOf(UnitTestException.class);
+    }
+
+    @Test
     public void invokeOnewayWithInterceptor_Success() {
         ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 10);
         registerOnewayProcessor(objectFuture);
@@ -300,27 +385,187 @@ public class NettyRemotingAbstractTest extends BaseTest {
     }
 
     @Test
-    public void registerInterceptor() {
-    }
+    public void invokeOnewayWithInterceptor_AccessException() {
+        ChannelPromise channelPromise = new 
DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop());
 
-    @Test
-    public void registerRequestProcessor() {
+        
when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise);
+        channelPromise.setFailure(new UnitTestException());
+
+        String expectedLog = "Send request command to channel null failed !";
+
+        ObjectFuture<String> objectFuture = retrieveStringFromLog(expectedLog);
+
+        remotingAbstract.invokeOnewayWithInterceptor(mockedClientChannel, 
remotingRequest);
+
+        assertThat(objectFuture.getObject()).contains(expectedLog);
     }
 
     @Test
-    public void registerRequestProcessor1() {
+    public void invokeOnewayWithInterceptor_SemaphoreExhausted() {
+        final ChannelPromise channelPromise = new 
DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop());
+
+        
when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise);
+
+        for (int i = 0; i < semaphoreNum; i++) {
+            remotingAbstract.invokeOnewayWithInterceptor(mockedClientChannel, 
remotingRequest);
+        }
+
+        runInThreads(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.SECONDS.sleep(1);
+                } catch (InterruptedException ignore) {
+                }
+                channelPromise.setSuccess();
+            }
+        }, 1);
+        String expectedLog = "No available oneway semaphore to issue the 
request";
+        ObjectFuture<String> objectFuture = retrieveStringFromLog(expectedLog);
+
+        remotingAbstract.invokeOnewayWithInterceptor(mockedClientChannel, 
remotingRequest);
+        assertThat(objectFuture.getObject()).contains(expectedLog);
     }
 
     @Test
     public void unregisterRequestProcessor() {
+        registerNormalProcessor();
+
+        RemotingCommand response = 
remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000);
+        assertThat(new String(response.payload())).isEqualTo("Pong");
+
+        remotingAbstract.unregisterRequestProcessor(requestCode);
+
+        response = remotingAbstract.invokeWithInterceptor(clientChannel, 
remotingRequest, 3000);
+        
assertThat(response.opCode()).isEqualTo(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED);
     }
 
     @Test
     public void processor() {
+        registerNormalProcessor();
+        RemotingCommand response = 
remotingAbstract.processor(requestCode).getLeft().processRequest(mock(RemotingChannel.class),
 remotingRequest);
+        assertThat(new String(response.payload())).isEqualTo("Pong");
+
+        assertThat(remotingAbstract.processor((short) (requestCode + 
1))).isNull();
+    }
+
+    @Test
+    public void registerRequestProcessor_SpecificExecutor() {
+        ExecutorService executor = 
ThreadUtils.newSingleThreadExecutor("CustomThread", true);
+
+        remotingAbstract.registerRequestProcessor(requestCode, new 
RequestProcessor() {
+            @Override
+            public RemotingCommand processRequest(final RemotingChannel 
channel, final RemotingCommand request) {
+                RemotingCommand response = 
remotingAbstract.commandFactory().createResponse(request);
+                response.payload(Thread.currentThread().getName().getBytes());
+                return response;
+            }
+        }, executor);
+
+        RemotingCommand response = 
remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000);
+        assertThat(new String(response.payload())).startsWith("CustomThread");
+    }
+
+    @Test
+    public void registerChannelEventListener_ExceptionThrown() {
+        registerNormalProcessor();
+
+        String expectedLog = "Exception thrown when dispatching channel event";
+
+        ObjectFuture<String> objectFuture = retrieveStringFromLog(expectedLog);
+
+        remotingAbstract.registerChannelEventListener(new 
ChannelEventListener() {
+            @Override
+            public void onChannelConnect(final RemotingChannel channel) {
+                throw new UnitTestException();
+            }
+
+            @Override
+            public void onChannelClose(final RemotingChannel channel) {
+
+            }
+
+            @Override
+            public void onChannelException(final RemotingChannel channel, 
final Throwable cause) {
+
+            }
+
+            @Override
+            public void onChannelIdle(final RemotingChannel channel) {
+
+            }
+        });
+
+        remotingAbstract.channelEventExecutor.start();
+        remotingAbstract.putNettyEvent(new 
NettyChannelEvent(NettyChannelEventType.CONNECT, clientChannel));
+
+        assertThat(objectFuture.getObject()).contains(expectedLog);
     }
 
     @Test
-    public void registerChannelEventListener() {
+    public void registerInterceptor_NoModification() {
+        registerNormalProcessor();
+        final ObjectFuture<RemotingCommand> localRequest = newObjectFuture(1, 
100);
+        final ObjectFuture<RemotingCommand> localResponse = newObjectFuture(1, 
100);
+
+        final ObjectFuture<RemotingCommand> remoteRequest = newObjectFuture(1, 
100);
+        final ObjectFuture<RemotingCommand> remoteResponse = 
newObjectFuture(1, 100);
+
+        remotingAbstract.registerInterceptor(new Interceptor() {
+            @Override
+            public void beforeRequest(final RequestContext context) {
+                if (context.getRemotingEndPoint() == RemotingEndPoint.REQUEST) 
{
+                    localRequest.putObject(context.getRequest());
+                    localRequest.release();
+                }
+
+                if (context.getRemotingEndPoint() == 
RemotingEndPoint.RESPONSE) {
+                    remoteRequest.putObject(context.getRequest());
+                    remoteRequest.release();
+                }
+            }
+
+            @Override
+            public void afterResponseReceived(final ResponseContext context) {
+                if (context.getRemotingEndPoint() == RemotingEndPoint.REQUEST) 
{
+                    localResponse.putObject(context.getResponse());
+                    localResponse.release();
+                }
+
+                if (context.getRemotingEndPoint() == 
RemotingEndPoint.RESPONSE) {
+                    remoteResponse.putObject(context.getResponse());
+                    remoteResponse.release();
+                }
+            }
+        });
+
+        RemotingCommand response = 
remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000);
+
+        assertThat(new String(response.payload())).startsWith("Pong");
+
+        assertThat(localRequest.getObject()).isEqualTo(remotingRequest);
+        assertThat(localResponse.getObject()).isEqualTo(response);
+
+        assertThat(remoteRequest.getObject()).isEqualTo(remotingRequest);
+        assertThat(remoteResponse.getObject()).isEqualTo(response);
+
+        remotingAbstract.invokeAsyncWithInterceptor(clientChannel, 
remotingRequestAsync, null,3000);
+
+        assertThat(localRequest.getObject()).isEqualTo(remotingRequestAsync);
+
+
+        
assertThat(localResponse.getObject().requestID()).isEqualTo(remotingRequestAsync.requestID());
+
+        assertThat(remoteRequest.getObject()).isEqualTo(remotingRequestAsync);
+        
assertThat(remoteResponse.getObject().requestID()).isEqualTo(remotingRequestAsync.requestID());
+
+        remotingAbstract.invokeOnewayWithInterceptor(clientChannel, 
remotingRequestOneway);
+
+        assertThat(localRequest.getObject()).isEqualTo(remotingRequestOneway);
+
+        assertThat(remoteRequest.getObject()).isEqualTo(remotingRequestOneway);
+        
assertThat(remoteResponse.getObject().requestID()).isEqualTo(remotingRequestOneway.requestID());
+
     }
 
     private void registerTimeoutProcessor(final int timeoutMillis) {

Reply via email to