Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java?rev=683259&r1=683258&r2=683259&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java Wed Aug 6 06:25:27 2008 @@ -28,7 +28,8 @@ public class GroupMapTest extends TestCase { protected BrokerService broker; - protected Connection connection; + protected Connection connection1; + protected Connection connection2; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; /** @@ -38,7 +39,7 @@ */ public void testAddMemberChangedListener() throws Exception { final AtomicInteger counter = new AtomicInteger(); - GroupMap map1 = new GroupMap(connection,"map1"); + GroupMap map1 = new GroupMap(connection1,"map1"); map1.addMemberChangedListener(new MemberChangedListener(){ public void memberStarted(Member member) { @@ -64,10 +65,7 @@ } } assertEquals(1, counter.get()); - ConnectionFactory factory = createConnectionFactory(); - Connection connection2 = factory.createConnection(); - connection2.start(); - GroupMap map2 = new GroupMap(connection,"map2"); + GroupMap map2 = new GroupMap(connection2,"map2"); map2.start(); synchronized(counter) { if (counter.get()<2) { @@ -78,12 +76,11 @@ map2.stop(); synchronized(counter) { if (counter.get()>=2) { - counter.wait(5000); + counter.wait(GroupMap.DEFAULT_HEART_BEAT_INTERVAL*3); } } assertEquals(1, counter.get()); map1.stop(); - connection2.close(); } /** @@ -92,48 +89,94 @@ * @throws Exception */ public void testAddMapChangedListener() throws Exception { - GroupMap map = new GroupMap(connection,"map"); - final AtomicBoolean called = new AtomicBoolean(); - map.addMapChangedListener(new MapChangedListener(){ - public void mapChanged(Member owner, Object key, Object oldValue, - Object newValue) { - synchronized(called) { - called.set(true); - called.notifyAll(); + final AtomicBoolean called1 = new AtomicBoolean(); + final AtomicBoolean called2 = new AtomicBoolean(); + + GroupMap map1 = new GroupMap(connection1,"map1"); + + map1.addMapChangedListener(new DefaultMapChangedListener() { + public void mapInsert(Member owner,Object Key, Object Value) { + synchronized(called1) { + called1.set(true); + called1.notifyAll(); } } - }); - map.start(); - map.put("test", "blob"); - synchronized(called) { - if (!called.get()) { - called.wait(5000); + map1.start(); + + GroupMap map2 = new GroupMap(connection2,"map2"); + + map2.addMapChangedListener(new DefaultMapChangedListener() { + public void mapInsert(Member owner,Object Key, Object Value) { + synchronized(called2) { + called2.set(true); + called2.notifyAll(); + } + } + }); + map2.start(); + + + map1.put("test", "blob"); + synchronized(called1) { + if (!called1.get()) { + called1.wait(5000); } } - assertTrue(called.get()); - map.stop(); + synchronized(called2) { + if (!called2.get()) { + called2.wait(5000); + } + } + assertTrue(called1.get()); + assertTrue(called2.get()); + map1.stop(); + map2.stop(); + } + + public void testGetWriteLock() throws Exception { + GroupMap map1 = new GroupMap(connection1, "map1"); + final AtomicBoolean called = new AtomicBoolean(); + map1.start(); + GroupMap map2 = new GroupMap(connection2, "map2"); + map2.setMinimumGroupSize(2); + map2.start(); + map2.put("test", "foo"); + try { + map1.put("test", "bah"); + fail("Should have thrown an exception!"); + } catch (GroupMapUpdateException e) { + } + map1.stop(); + map2.stop(); } + /** * Test method for [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#clear()}. - * @throws Exception + * + * @throws Exception */ public void testClear() throws Exception { - GroupMap map1 = new GroupMap(connection,"map1"); + GroupMap map1 = new GroupMap(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new MapChangedListener(){ - public void mapChanged(Member owner, Object key, Object oldValue, - Object newValue) { + map1.addMapChangedListener(new DefaultMapChangedListener() { + public void mapInsert(Member owner,Object Key, Object Value) { synchronized(called) { called.set(true); called.notifyAll(); } } + public void mapRemove(Member owner, Object key, Object value,boolean expired) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } }); map1.start(); - GroupMap map2 = new GroupMap(connection,"map2"); + GroupMap map2 = new GroupMap(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -159,20 +202,19 @@ * Test a new map is populated for existing values */ public void testMapUpdatedOnStart() throws Exception { - GroupMap map1 = new GroupMap(connection,"map1"); + GroupMap map1 = new GroupMap(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); map1.start(); map1.put("test", "foo"); - GroupMap map2 = new GroupMap(connection,"map2"); - map2.addMapChangedListener(new MapChangedListener(){ - public void mapChanged(Member owner, Object key, Object oldValue, - Object newValue) { + GroupMap map2 = new GroupMap(connection2,"map2"); + map2.addMapChangedListener(new DefaultMapChangedListener() { + public void mapInsert(Member owner,Object Key, Object Value) { synchronized(called) { called.set(true); called.notifyAll(); } - } + } }); map2.start(); @@ -190,20 +232,18 @@ } public void testContainsKey() throws Exception { - GroupMap map1 = new GroupMap(connection,"map1"); + GroupMap map1 = new GroupMap(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new MapChangedListener(){ - public void mapChanged(Member owner, Object key, Object oldValue, - Object newValue) { + map1.addMapChangedListener(new DefaultMapChangedListener() { + public void mapInsert(Member owner,Object Key, Object Value) { synchronized(called) { called.set(true); called.notifyAll(); } } - }); map1.start(); - GroupMap map2 = new GroupMap(connection,"map2"); + GroupMap map2 = new GroupMap(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -225,20 +265,18 @@ * @throws Exception */ public void testContainsValue() throws Exception { - GroupMap map1 = new GroupMap(connection,"map1"); + GroupMap map1 = new GroupMap(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new MapChangedListener(){ - public void mapChanged(Member owner, Object key, Object oldValue, - Object newValue) { + map1.addMapChangedListener(new DefaultMapChangedListener() { + public void mapInsert(Member owner,Object Key, Object Value) { synchronized(called) { called.set(true); called.notifyAll(); } } - }); map1.start(); - GroupMap map2 = new GroupMap(connection,"map2"); + GroupMap map2 = new GroupMap(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -265,20 +303,18 @@ * @throws Exception */ public void testGet() throws Exception { - GroupMap map1 = new GroupMap(connection,"map1"); + GroupMap map1 = new GroupMap(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new MapChangedListener(){ - public void mapChanged(Member owner, Object key, Object oldValue, - Object newValue) { + map1.addMapChangedListener(new DefaultMapChangedListener() { + public void mapInsert(Member owner,Object Key, Object Value) { synchronized(called) { called.set(true); called.notifyAll(); } } - }); map1.start(); - GroupMap map2 = new GroupMap(connection,"map2"); + GroupMap map2 = new GroupMap(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -299,20 +335,25 @@ * [EMAIL PROTECTED] org.apache.activemq.group.GroupMap#remove(java.lang.Object)}. */ public void testRemove() throws Exception{ - GroupMap map1 = new GroupMap(connection,"map1"); + GroupMap map1 = new GroupMap(connection1,"map1"); final AtomicBoolean called = new AtomicBoolean(); - map1.addMapChangedListener(new MapChangedListener(){ - public void mapChanged(Member owner, Object key, Object oldValue, - Object newValue) { + map1.addMapChangedListener(new DefaultMapChangedListener() { + public void mapInsert(Member owner,Object Key, Object Value) { synchronized(called) { called.set(true); called.notifyAll(); } } + public void mapRemove(Member owner, Object key, Object value,boolean expired) { + synchronized(called) { + called.set(true); + called.notifyAll(); + } + } }); map1.start(); - GroupMap map2 = new GroupMap(connection,"map2"); + GroupMap map2 = new GroupMap(connection2,"map2"); map2.start(); map2.put("test","foo"); synchronized(called) { @@ -330,6 +371,53 @@ } } assertTrue(map1.isEmpty()); + + map1.stop(); + map2.stop(); + } + + public void testExpire() throws Exception{ + final AtomicBoolean called1 = new AtomicBoolean(); + final AtomicBoolean called2 = new AtomicBoolean(); + + GroupMap map1 = new GroupMap(connection1,"map1"); + map1.setTimeToLive(1000); + map1.addMapChangedListener(new DefaultMapChangedListener() { + public void mapRemove(Member owner, Object key, Object value,boolean expired) { + synchronized(called1) { + called1.set(expired); + called1.notifyAll(); + } + } + }); + map1.start(); + + GroupMap map2 = new GroupMap(connection2,"map2"); + + map2.addMapChangedListener(new DefaultMapChangedListener() { + public void mapRemove(Member owner, Object key, Object value,boolean expired) { + synchronized(called2) { + called2.set(expired); + called2.notifyAll(); + } + } + }); + map2.start(); + + + map1.put("test", "blob"); + synchronized(called1) { + if (!called1.get()) { + called1.wait(5000); + } + } + synchronized(called2) { + if (!called2.get()) { + called2.wait(5000); + } + } + assertTrue(called1.get()); + assertTrue(called2.get()); map1.stop(); map2.stop(); } @@ -339,14 +427,17 @@ broker = createBroker(); } ConnectionFactory factory = createConnectionFactory(); - connection = factory.createConnection(); - connection.start(); + connection1 = factory.createConnection(); + connection1.start(); + connection2 = factory.createConnection(); + connection2.start(); super.setUp(); } protected void tearDown() throws Exception { super.tearDown(); - connection.close(); + connection1.close(); + connection2.close(); if (broker != null) { broker.stop(); }
