Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java?rev=684047&r1=684046&r2=684047&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java Fri Aug 8 11:48:06 2008 @@ -34,12 +34,12 @@ /** * Test method for - * [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. + * [EMAIL PROTECTED] org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. * @throws Exception */ public void testAddMemberChangedListener() throws Exception { final AtomicInteger counter = new AtomicInteger(); - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); map1.addMemberChangedListener(new MemberChangedListener(){ public void memberStarted(Member member) { @@ -65,7 +65,7 @@ } } assertEquals(1, counter.get()); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); synchronized(counter) { if (counter.get()<2) { @@ -76,7 +76,7 @@ map2.stop(); synchronized(counter) { if (counter.get()>=2) { - counter.wait(GroupMap.DEFAULT_HEART_BEAT_INTERVAL*3); + counter.wait(Group.DEFAULT_HEART_BEAT_INTERVAL*3); } } assertEquals(1, counter.get()); @@ -85,14 +85,14 @@ /** * Test method for - * [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}. + * [EMAIL PROTECTED] org.apache.activemq.group.Group#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}. * @throws Exception */ public void testAddMapChangedListener() throws Exception { final AtomicBoolean called1 = new AtomicBoolean(); final AtomicBoolean called2 = new AtomicBoolean(); - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -104,7 +104,7 @@ }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -133,12 +133,32 @@ map1.stop(); map2.stop(); } + + public void testGetImplicitWriteLock() throws Exception { + Group map1 = new Group(connection1, "map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.start(); + Group map2 = new Group(connection2, "map2"); + map2.setAlwaysLock(true); + map2.setMinimumGroupSize(2); + map2.start(); + map2.put("test", "foo"); + try { + map1.put("test", "bah"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + map1.stop(); + map2.stop(); + } - public void testGetWriteLock() throws Exception { - GroupMap map1 = new GroupMap(connection1, "map1"); + public void testExpireImplicitWriteLock() throws Exception { + Group map1 = new Group(connection1, "map1"); final AtomicBoolean called = new AtomicBoolean(); map1.start(); - GroupMap map2 = new GroupMap(connection2, "map2"); + Group map2 = new Group(connection2, "map2"); + map2.setAlwaysLock(true); + map2.setLockTimeToLive(1000); map2.setMinimumGroupSize(2); map2.start(); map2.put("test", "foo"); @@ -147,18 +167,68 @@ fail("Should have thrown an exception!"); } catch (GroupMapUpdateException e) { } + Thread.sleep(2000); + map1.put("test", "bah"); map1.stop(); map2.stop(); } - + + public void testExpireImplicitLockOnExit() throws Exception { + Group map1 = new Group(connection1, "map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.start(); + Group map2 = new Group(connection2, "map2"); + map2.setAlwaysLock(true); + map2.setMinimumGroupSize(2); + map2.start(); + map2.put("test", "foo"); + try { + map1.put("test", "bah"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + map2.stop(); + map1.put("test", "bah"); + map1.stop(); + + } + + public void testGetExplicitWriteLock() throws Exception { + Group map1 = new Group(connection1, "map1"); + map1.setAlwaysLock(true); + final AtomicBoolean called = new AtomicBoolean(); + map1.start(); + Group map2 = new Group(connection2, "map2"); + map2.setAlwaysLock(true); + map2.setMinimumGroupSize(2); + map2.start(); + map2.put("test", "foo"); + map2.lock("test"); + try { + map1.put("test", "bah"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + map2.unlock("test"); + map1.lock("test"); + try { + map2.lock("test"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + map1.stop(); + map2.stop(); + } + + /** - * Test method for [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#clear()}. + * Test method for [EMAIL PROTECTED] org.apache.activemq.group.Group#clear()}. * * @throws Exception */ public void testClear() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -176,7 +246,7 @@ } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -202,12 +272,12 @@ * Test a new map is populated for existing values */ public void testMapUpdatedOnStart() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.start(); map1.put("test", "foo"); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { synchronized(called) { @@ -230,9 +300,9 @@ map1.stop(); map2.stop(); } - + public void testContainsKey() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -243,7 +313,7 @@ } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -261,11 +331,11 @@ /** * Test method for - * [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#containsValue(java.lang.Object)}. + * [EMAIL PROTECTED] org.apache.activemq.group.Group#containsValue(java.lang.Object)}. * @throws Exception */ public void testContainsValue() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -276,7 +346,7 @@ } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -299,11 +369,11 @@ /** * Test method for - * [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#get(java.lang.Object)}. + * [EMAIL PROTECTED] org.apache.activemq.group.Group#get(java.lang.Object)}. * @throws Exception */ public void testGet() throws Exception { - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -314,7 +384,7 @@ } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -327,15 +397,29 @@ map1.stop(); map2.stop(); } + + public void testPut() throws Exception { + Group map1 = new Group(connection1,"map1"); + map1.start(); + Group map2 = new Group(connection2,"map2"); + map2.setMinimumGroupSize(2); + map2.start(); + Object value = map1.put("foo", "blob"); + assertNull(value); + value = map1.put("foo", "blah"); + assertEquals(value, "blob"); + map1.stop(); + map2.stop(); + } /** * Test method for - * [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#remove(java.lang.Object)}. + * [EMAIL PROTECTED] org.apache.activemq.group.Group#remove(java.lang.Object)}. */ public void testRemove() throws Exception{ - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapInsert(Member owner,Object Key, Object Value) { @@ -353,7 +437,7 @@ } }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -380,7 +464,7 @@ final AtomicBoolean called1 = new AtomicBoolean(); final AtomicBoolean called2 = new AtomicBoolean(); - GroupMap map1 = new GroupMap(connection1,"map1"); + Group map1 = new Group(connection1,"map1"); map1.setTimeToLive(1000); map1.addMapChangedListener(new DefaultMapChangedListener() { public void mapRemove(Member owner, Object key, Object value,boolean expired) { @@ -392,7 +476,7 @@ }); map1.start(); - GroupMap map2 = new GroupMap(connection2,"map2"); + Group map2 = new Group(connection2,"map2"); map2.addMapChangedListener(new DefaultMapChangedListener() { public void mapRemove(Member owner, Object key, Object value,boolean expired) {
Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java (from r683259, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java) URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java&r1=683259&r2=684047&rev=684047&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java Fri Aug 8 11:48:06 2008 @@ -26,48 +26,102 @@ import org.apache.activemq.broker.BrokerService; -public class GroupMapMemberTest extends TestCase { +public class GroupMemberTest extends TestCase { protected BrokerService broker; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + + public void testCoordinatorSelection() throws Exception{ + Group group = new Group(null,""); + List<Member>list = new ArrayList<Member>(); + final int number =10; + Member choosen = null; + for (int i =0;i< number;i++) { + Member m = new Member("group"+i); + m.setId(""+i); + if (number/2==i) { + m.setCoordinatorWeight(10); + choosen=m; + } + list.add(m); + } + Member c = group.selectCordinator(list); + assertEquals(c,choosen); + } /** * Test method for - * [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. + * [EMAIL PROTECTED] org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}. * @throws Exception */ public void testGroup() throws Exception { - int number = 20; + final int number = 10; + List<Connection>connections = new ArrayList<Connection>(); + List<Group>groupMaps = new ArrayList<Group>(); + ConnectionFactory factory = createConnectionFactory(); + for (int i =0; i < number; i++) { + Connection connection = factory.createConnection(); + connection.start(); + connections.add(connection); + Group map = new Group(connection,"map"+i); + map.setHeartBeatInterval(200); + if(i ==number-1) { + map.setMinimumGroupSize(number); + } + map.start(); + groupMaps.add(map); + } + + int coordinatorNumber = 0; + for (Group map:groupMaps) { + if (map.isCoordinator()) { + coordinatorNumber++; + } + } + for(Group map:groupMaps) { + map.stop(); + } + for (Connection connection:connections) { + connection.stop(); + } + + } + +public void XtestWeightedGroup() throws Exception { + + final int number = 10; List<Connection>connections = new ArrayList<Connection>(); - List<GroupMap>groupMaps = new ArrayList<GroupMap>(); + List<Group>groupMaps = new ArrayList<Group>(); + Group last = null; ConnectionFactory factory = createConnectionFactory(); for (int i =0; i < number; i++) { Connection connection = factory.createConnection(); connection.start(); connections.add(connection); - GroupMap map = new GroupMap(connection,"map"+i); - map.setHeartBeatInterval(20000); + Group map = new Group(connection,"map"+i); + map.setHeartBeatInterval(200); if(i ==number-1) { map.setMinimumGroupSize(number); + map.setCoordinatorWeight(10); + last=map; } map.start(); groupMaps.add(map); } int coordinator = 0; - for (GroupMap map:groupMaps) { + Group groupCoordinator = null; + for (Group map:groupMaps) { if (map.isCoordinator()) { coordinator++; + groupCoordinator=map; } } + assertNotNull(groupCoordinator); + assertEquals(groupCoordinator, last); assertEquals(1,coordinator); - groupMaps.get(0).put("key", "value"); - Thread.sleep(2000); - for (GroupMap map:groupMaps) { - assertTrue(map.get("key").equals("value")); - } - for(GroupMap map:groupMaps) { + for(Group map:groupMaps) { map.stop(); } for (Connection connection:connections) { Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java?rev=684047&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java Fri Aug 8 11:48:06 2008 @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.group; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; + +public class GroupMessageTest extends TestCase { + protected BrokerService broker; + protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; + + public void testGroupBroadcast() throws Exception { + final int number = 10; + final AtomicInteger count = new AtomicInteger(); + List<Connection> connections = new ArrayList<Connection>(); + List<Group> groups = new ArrayList<Group>(); + ConnectionFactory factory = createConnectionFactory(); + for (int i = 0; i < number; i++) { + Connection connection = factory.createConnection(); + connection.start(); + connections.add(connection); + Group group = new Group(connection, "group" + i); + group.setHeartBeatInterval(20000); + if (i == number - 1) { + group.setMinimumGroupSize(number); + } + group.start(); + groups.add(group); + group.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + synchronized (count) { + if (count.incrementAndGet() == number) { + count.notifyAll(); + } + } + } + }); + } + groups.get(0).broadcastMessage("hello"); + synchronized (count) { + if (count.get() < number) { + count.wait(5000); + } + } + assertEquals(number, count.get()); + for (Group map : groups) { + map.stop(); + } + for (Connection connection : connections) { + connection.stop(); + } + } + + public void testsendMessage() throws Exception { + final int number = 10; + final AtomicInteger count = new AtomicInteger(); + List<Connection> connections = new ArrayList<Connection>(); + List<Group> groups = new ArrayList<Group>(); + ConnectionFactory factory = createConnectionFactory(); + for (int i = 0; i < number; i++) { + Connection connection = factory.createConnection(); + connection.start(); + connections.add(connection); + Group group = new Group(connection, "group" + i); + group.setHeartBeatInterval(20000); + if (i == number - 1) { + group.setMinimumGroupSize(number); + } + group.start(); + groups.add(group); + group.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + synchronized (count) { + count.incrementAndGet(); + count.notifyAll(); + } + } + }); + } + groups.get(0).sendMessage("hello"); + synchronized (count) { + if (count.get() == 0) { + count.wait(5000); + } + } + // wait a while to check that only one got it + Thread.sleep(2000); + assertEquals(1, count.get()); + for (Group map : groups) { + map.stop(); + } + for (Connection connection : connections) { + connection.stop(); + } + } + + public void testSendToSingleMember() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + Connection connection1 = factory.createConnection(); + Connection connection2 = factory.createConnection(); + connection1.start(); + connection2.start(); + Group group1 = new Group(connection1, "group1"); + final AtomicBoolean called = new AtomicBoolean(); + group1.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + synchronized (called) { + called.set(true); + called.notifyAll(); + } + } + }); + group1.start(); + Group group2 = new Group(connection2, "group2"); + group2.setMinimumGroupSize(2); + group2.start(); + Member member1 = group2.getMemberByName("group1"); + group2.sendMessage(member1, "hello"); + synchronized (called) { + if (!called.get()) { + called.wait(5000); + } + } + assertTrue(called.get()); + group1.stop(); + group2.stop(); + connection1.close(); + connection2.close(); + } + + public void testSendRequestReply() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + Connection connection1 = factory.createConnection(); + Connection connection2 = factory.createConnection(); + connection1.start(); + connection2.start(); + final int number = 1000; + final AtomicInteger requestCount = new AtomicInteger(); + final AtomicInteger replyCount = new AtomicInteger(); + final List<String> requests = new ArrayList<String>(); + final List<String> replies = new ArrayList<String>(); + for (int i = 0; i < number; i++) { + requests.add("request" + i); + replies.add("reply" + i); + } + final Group group1 = new Group(connection1, "group1"); + final AtomicBoolean finished = new AtomicBoolean(); + group1.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + if (!replies.isEmpty()) { + String reply = replies.remove(0); + try { + group1.sendMessageResponse(sender, replyId, reply); + } catch (JMSException e) { + e.printStackTrace(); + } + } + } + }); + group1.start(); + final Group group2 = new Group(connection2, "group2"); + group2.setMinimumGroupSize(2); + group2.addGroupMessageListener(new GroupMessageListener() { + public void messageDelivered(Member sender, String replyId, + Object message) { + if (!requests.isEmpty()) { + String request = requests.remove(0); + try { + group2.sendMessage(sender, request); + } catch (JMSException e) { + e.printStackTrace(); + } + }else { + synchronized (finished) { + finished.set(true); + finished.notifyAll(); + } + } + } + }); + group2.start(); + Member member1 = group2.getMemberByName("group1"); + group2.sendMessage(member1, requests.remove(0)); + synchronized (finished) { + if (!finished.get()) { + finished.wait(10000); + } + } + assertTrue(finished.get()); + group1.stop(); + group2.stop(); + connection1.close(); + connection2.close(); + } + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() + throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( + ActiveMQConnection.DEFAULT_BROKER_URL); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java ------------------------------------------------------------------------------ svn:eol-style = native
