Repository: activemq-artemis Updated Branches: refs/heads/master 7f60ff20a -> 8f3157a5b
ARTEMIS-281 - fix reference counting for jgroups channels https://issues.apache.org/jira/browse/ARTEMIS-281 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/290cb65b Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/290cb65b Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/290cb65b Branch: refs/heads/master Commit: 290cb65b1733092344fe08b1c10e91a1a20daa30 Parents: 7f60ff2 Author: Andy Taylor <[email protected]> Authored: Tue Oct 27 08:12:29 2015 +0000 Committer: Andy Taylor <[email protected]> Committed: Tue Oct 27 10:10:52 2015 +0000 ---------------------------------------------------------------------- .../api/core/JGroupsBroadcastEndpoint.java | 23 +++-- .../core/JGroupsChannelBroadcastEndpoint.java | 4 +- .../broadcast/JGroupsBroadcastTest.java | 90 ++++++++++++++++++++ 3 files changed, 110 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/290cb65b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java index df66450..b75531e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java @@ -110,15 +110,16 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { channel.removeReceiver(receiver); clientOpened = false; } - internalCloseChannel(); + internalCloseChannel(channel); } /** * Closes the channel used in this JGroups Broadcast. * Can be overridden by implementations that use an externally managed channel. + * @param channel */ - protected synchronized void internalCloseChannel() { - channel.close(); + protected synchronized void internalCloseChannel(JChannelWrapper channel) { + channel.close(true); } /** @@ -161,10 +162,15 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { this.channel = channel; } - public synchronized void close() { + public synchronized void close(boolean closeWrappedChannel) { refCount--; if (refCount == 0) { - JChannelManager.closeChannel(this.channelName, channel); + if (closeWrappedChannel) { + JChannelManager.closeChannel(this.channelName, channel); + } + else { + JChannelManager.removeChannel(this.channelName); + } } } @@ -246,5 +252,12 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint { throw new IllegalStateException("Did not find channel " + channelName); } } + + public static void removeChannel(String channelName) { + JChannelWrapper wrapper = channels.remove(channelName); + if (wrapper == null) { + throw new IllegalStateException("Did not find channel " + channelName); + } + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/290cb65b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java index 9793c78..4fbb24c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsChannelBroadcastEndpoint.java @@ -38,7 +38,7 @@ public class JGroupsChannelBroadcastEndpoint extends JGroupsBroadcastEndpoint { } @Override - protected synchronized void internalCloseChannel() { - // no-op, this version takes an externally managed channel. + protected synchronized void internalCloseChannel(JChannelWrapper channel) { + channel.close(false); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/290cb65b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java new file mode 100644 index 0000000..83b8f28 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.broadcast; + +import org.apache.activemq.artemis.api.core.BroadcastEndpoint; +import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory; +import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory; +import org.jgroups.JChannel; +import org.jgroups.conf.PlainConfigurator; +import org.junit.Assert; +import org.junit.Test; + +public class JGroupsBroadcastTest { + + private final String jgroupsConfigString = "UDP(oob_thread_pool.max_threads=300;" + "bind_addr=127.0.0.1;oob_thread_pool.keep_alive_time=1000;" + "max_bundle_size=31k;mcast_send_buf_size=640000;" + "internal_thread_pool.keep_alive_time=60000;" + "internal_thread_pool.rejection_policy=discard;" + "mcast_recv_buf_size=25000000;bind_port=55200;" + "internal_thread_pool.queue_max_size=100;" + "mcast_port=45688;thread_pool.min_threads=20;" + "oob_thread_pool.rejection_policy=discard;" + "thread_pool.max_threads=300;enable_diagnostics=false;" + "thread_pool.enabled=true;internal_thread_pool.queue_enabled=true;" + "ucast_recv_buf_size=20000000;ucast_send_buf_size=640000;" + "internal_thread_pool.enabled=true;oob_thread_pool.enabled=true;" + "ip_ttl=2;thread_pool.rejection_policy=discard;thread_pool.keep_alive_time=5000;" + "internal_thread_pool.max_threads=10;thread_pool.queue_enabled=true;" + "mcast_addr=230.0.0.4;singleton_name=udp;max_bundle_timeout=30;" + "oob_thread_pool.queue_enab led=false;internal_thread_pool.min_threads=1;" + "bundler_type=old;oob_thread_pool.min_threads=20;" + "thread_pool.queue_max_size=1000):PING(num_initial_members=3;" + "timeout=2000):MERGE3(min_interval=20000;max_interval=100000)" + ":FD_SOCK(bind_addr=127.0.0.1;start_port=54200):FD_ALL(interval=3000;" + "timeout=15000):VERIFY_SUSPECT(bind_addr=127.0.0.1;" + "timeout=1500):pbcast.NAKACK2(max_msg_batch_size=100;" + "xmit_table_msgs_per_row=10000;xmit_table_max_compaction_time=10000;" + "xmit_table_num_rows=100;xmit_interval=1000):UNICAST3(xmit_table_msgs_per_row=10000;" + "xmit_table_max_compaction_time=10000;xmit_table_num_rows=20)" + ":pbcast.STABLE(desired_avg_gossip=50000;max_bytes=400000;" + "stability_delay=1000):pbcast.GMS(print_local_addr=true;" + "view_bundling=true;join_timeout=3000;view_ack_collection_timeout=5000;" + "resume_task_timeout=7500):UFC(max_credits=1m;min_threshold=0.40)" + ":MFC(max_credits=1m;min_threshold=0.40):FRAG2(frag_size=30k)" + ":RSVP(resend_interval=5 00;ack_on_delivery=false;timeout=60000)"; + + + + + @Test + public void testRefCount() throws Exception { + try { + + PlainConfigurator configurator = new PlainConfigurator(jgroupsConfigString); + JChannel channel = new JChannel(configurator); + + String channelName1 = "channel1"; + + BroadcastEndpointFactory jgroupsBroadcastCfg1 = new ChannelBroadcastEndpointFactory(channel, channelName1); + + BroadcastEndpoint channelEndpoint1 = jgroupsBroadcastCfg1.createBroadcastEndpoint(); + + BroadcastEndpoint channelEndpoint2 = jgroupsBroadcastCfg1.createBroadcastEndpoint(); + + BroadcastEndpoint channelEndpoint3 = jgroupsBroadcastCfg1.createBroadcastEndpoint(); + + channelEndpoint1.close(true); + + Assert.assertTrue(channel.isOpen()); + + channelEndpoint2.close(true); + + Assert.assertTrue(channel.isOpen()); + + channelEndpoint3.close(true); + + Assert.assertTrue(channel.isOpen()); + + channel.close(); + + //after we close the last endpoint reference counting will close the channel so once we create a new one the + // channel wrapper is recreated + try { + channelEndpoint2.openClient(); + Assert.fail("this should be closed"); + } + catch (Exception e) { + } + + JChannel newChannel = new JChannel(configurator); + + jgroupsBroadcastCfg1 = new ChannelBroadcastEndpointFactory(newChannel, channelName1); + + channelEndpoint1 = jgroupsBroadcastCfg1.createBroadcastEndpoint(); + + channelEndpoint1.openClient(); + + + } + catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + +}
