This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 4c149a57b720ea0e3cb95a92dc3849cad16a87d4 Author: yukon <[email protected]> AuthorDate: Tue Jun 4 21:46:19 2019 +0800 Add tests for ClientChannelManager --- .../remoting/common/metrics/ChannelMetrics.java | 27 ---- .../rocketmq/remoting/config/RemotingConfig.java | 2 +- .../remoting/impl/netty/ClientChannelManager.java | 11 +- .../org/apache/rocketmq/remoting/BaseTest.java | 25 +++- .../impl/netty/ClientChannelManagerTest.java | 162 +++++++++++++++++++++ 5 files changed, 188 insertions(+), 39 deletions(-) diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java deleted file mode 100755 index db959b7..0000000 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.common.metrics; - -import io.netty.channel.group.ChannelGroup; - -public interface ChannelMetrics { - - Integer getChannelCount(); - - ChannelGroup getChannels(); -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java index cc81a11..9fa79c2 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java @@ -38,7 +38,7 @@ public class RemotingConfig extends TcpSocketConfig { private int serviceThreadBlockQueueSize = 50000; private boolean clientNativeEpollEnable = false; private int clientWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; - private int clientConnectionFutureAwaitTimeoutMillis = 30000; + private int clientConnectionFutureAwaitTimeoutMillis = 3000; private int clientAsyncCallbackExecutorThreads = 16 + Runtime.getRuntime().availableProcessors() * 2; private int clientOnewayInvokeSemaphore = 20480; diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java index 2f59d24..0b084a0 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java @@ -38,7 +38,7 @@ public class ClientChannelManager { protected static final Logger LOG = LoggerFactory.getLogger(ClientChannelManager.class); private static final long LOCK_TIMEOUT_MILLIS = 3000; - private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>(); + final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>(); private final Lock lockChannelTables = new ReentrantLock(); private final Bootstrap clientBootstrap; private final RemotingConfig clientConfig; @@ -101,8 +101,7 @@ public class ClientChannelManager { } else { LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } - } catch (InterruptedException e) { - e.printStackTrace(); + } catch (InterruptedException ignore) { } if (cw != null) { @@ -125,9 +124,6 @@ public class ClientChannelManager { } void closeChannel(final String addr, final Channel channel) { - if (null == channel) - return; - final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr; try { if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { @@ -171,9 +167,6 @@ public class ClientChannelManager { } void closeChannel(final Channel channel) { - if (null == channel) - return; - try { if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java index 21c7b38..aab6dfb 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java @@ -20,6 +20,7 @@ package org.apache.rocketmq.remoting; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.RandomStringUtils; import org.apache.rocketmq.remoting.api.command.RemotingCommand; @@ -28,14 +29,34 @@ import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; public class BaseTest { - protected void runInThreads(Runnable runnable, int threadsNum) { + protected void runInThreads(final Runnable runnable, int threadsNum) { ExecutorService executor = Executors.newFixedThreadPool(threadsNum); for (int i = 0; i < threadsNum; i++) { - executor.submit(runnable); + executor.submit(new Runnable() { + @Override + public void run() { + runnable.run(); + } + }); } + ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS); } + protected void runInThreads(final Runnable runnable, int threadsNum, int timeoutMillis) throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + + runInThreads(new Runnable() { + @Override + public void run() { + runnable.run(); + semaphore.release(); + } + }, threadsNum); + + semaphore.tryAcquire(threadsNum, timeoutMillis, TimeUnit.MILLISECONDS); + } + protected void shouldNotReachHere() { throw new RuntimeException("shouldn't reach here"); } diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java new file mode 100644 index 0000000..16084d8 --- /dev/null +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.channel.DefaultEventLoop; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.remoting.BaseTest; +import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ClientChannelManagerTest extends BaseTest { + private final static String TARGET_ADDR = "127.0.0.1:8080"; + @Mock + private Bootstrap clientBootstrap; + + @Mock + private Channel channel; + + private ClientChannelManager channelManager; + + private ChannelPromise channelPromise; + + @Before + public void init() { + channelPromise = new DefaultChannelPromise(channel, new DefaultEventLoop()); + + when(channel.isActive()).thenReturn(true); + when(clientBootstrap.connect(any(SocketAddress.class))).thenReturn(channelPromise); + when(channel.close()).thenReturn(channelPromise); + when(channel.remoteAddress()).thenReturn(new InetSocketAddress(8080)); + + channelManager = new ClientChannelManager(clientBootstrap, new RemotingConfig()); + } + + @Test + public void clear() { + channelPromise.setSuccess(); + Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + + assertThat(targetChannel).isNotNull(); + assertThat(channelManager.channelTables.size()).isEqualTo(1); + + channelManager.clear(); + assertThat(channelManager.channelTables.size()).isEqualTo(0); + + } + + @Test + public void createIfAbsent_UseExistingConnection_Success() { + channelPromise.setSuccess(); + assertThat(channelManager.channelTables.size()).isEqualTo(0); + Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + assertThat(targetChannel).isEqualTo(channel); + + assertThat(channelManager.channelTables.size()).isEqualTo(1); + targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + assertThat(targetChannel).isEqualTo(channel); + assertThat(channelManager.channelTables.size()).isEqualTo(1); + } + + @Test + public void createIfAbsent_CreateNewConnection_Success() { + channelPromise.setSuccess(); + Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + assertThat(targetChannel).isEqualTo(channel); + } + + @Test + public void createIfAbsent_Concurrent_Success() throws InterruptedException { + int concurrentNum = 3; + final boolean[] channelMismatch = {false}; + + runInThreads(new Runnable() { + @Override + public void run() { + try { + TimeUnit.MILLISECONDS.sleep(10); + channelPromise.setSuccess(); + } catch (InterruptedException ignore) { + } + } + }, 1); + runInThreads(new Runnable() { + @Override + public void run() { + Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + if (targetChannel != channel) { + channelMismatch[0] = true; + } + } + }, concurrentNum, 3000); + + assertThat(channelMismatch[0]).isFalse(); + assertThat(channelManager.channelTables.size()).isEqualTo(1); + } + + @Test + public void createIfAbsent_ClosedChannel_NullReturn() { + channelPromise.setSuccess(); + when(channel.isActive()).thenReturn(false); + Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + assertThat(targetChannel).isNull(); + } + + @Test + public void closeChannel_WithChannel_Success() { + channelPromise.setSuccess(); + Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + + channelManager.closeChannel(targetChannel); + assertThat(channelManager.channelTables.size()).isEqualTo(0); + } + + @Test + public void closeChannel_WithAddr_Success() { + channelPromise.setSuccess(); + Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + + channelManager.closeChannel(TARGET_ADDR, targetChannel); + assertThat(channelManager.channelTables.size()).isEqualTo(0); + } + + @Test + public void closeChannel_NonExistingChannel_Success() { + channelPromise.setSuccess(); + Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR); + + channelManager.closeChannel(mock(Channel.class)); + assertThat(channelManager.channelTables.size()).isEqualTo(1); + } +} \ No newline at end of file
