Added:
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java?rev=685612&view=auto
==============================================================================
---
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java
(added)
+++
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java
Wed Aug 13 10:05:07 2008
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.groupmq.Group;
+import org.apache.groupmq.Member;
+
+
+public class GroupMemberTest extends TestCase {
+ protected BrokerService broker;
+ protected String bindAddress =
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+
+ public void testCoordinatorSelection() throws Exception{
+ Group group = new Group(null,"");
+ List<Member>list = new ArrayList<Member>();
+ final int number =10;
+ Member choosen = null;
+ for (int i =0;i< number;i++) {
+ Member m = new Member("group"+i);
+ m.setId(""+i);
+ if (number/2==i) {
+ m.setCoordinatorWeight(10);
+ choosen=m;
+ }
+ list.add(m);
+ }
+ Member c = group.selectCordinator(list);
+ assertEquals(c,choosen);
+ }
+ /**
+ * Test method for
+ * [EMAIL PROTECTED]
org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+ * @throws Exception
+ */
+ public void testGroup() throws Exception {
+
+ final int number = 10;
+ List<Group>groupMaps = new ArrayList<Group>();
+ ConnectionFactory factory = createConnectionFactory();
+ for (int i =0; i < number; i++) {
+ Connection connection = factory.createConnection();
+ Group map = new Group(connection,"map"+i);
+ map.setHeartBeatInterval(200);
+ map.setMinimumGroupSize(i+1);
+ map.start();
+ groupMaps.add(map);
+ }
+
+ int coordinatorNumber = 0;
+ for (Group map:groupMaps) {
+ if (map.isCoordinator()) {
+ coordinatorNumber++;
+ }
+ }
+ for(Group map:groupMaps) {
+ map.stop();
+ }
+
+ }
+
+public void testWeightedGroup() throws Exception {
+
+ final int number = 10;
+ List<Group>groupMaps = new ArrayList<Group>();
+ Group last = null;
+ ConnectionFactory factory = createConnectionFactory();
+ for (int i =0; i < number; i++) {
+ Connection connection = factory.createConnection();
+ Group map = new Group(connection,"map"+i);
+ if(i ==number/2) {
+ map.setCoordinatorWeight(10);
+ last=map;
+ }
+
+ map.setMinimumGroupSize(i+1);
+ map.start();
+ groupMaps.add(map);
+ }
+ Thread.sleep(2000);
+ int coordinator = 0;
+ Group groupCoordinator = null;
+ for (Group map:groupMaps) {
+ if (map.isCoordinator()) {
+ coordinator++;
+ groupCoordinator=map;
+ }
+ }
+
+
+ assertNotNull(groupCoordinator);
+ assertEquals(1,coordinator);
+ assertEquals(last.getName(),groupCoordinator.getName());
+
+ for(Group map:groupMaps) {
+ map.stop();
+ }
+ }
+
+
+
+ protected void setUp() throws Exception {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory()throws
Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+ ActiveMQConnection.DEFAULT_BROKER_URL);
+ return cf;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ configureBroker(answer);
+ answer.start();
+ return answer;
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception {
+ answer.setPersistent(false);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}
+
Propchange:
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMemberTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java?rev=685612&view=auto
==============================================================================
---
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java
(added)
+++
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java
Wed Aug 13 10:05:07 2008
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.groupmq.Group;
+import org.apache.groupmq.GroupMessageListener;
+import org.apache.groupmq.Member;
+
+public class GroupMessageTest extends TestCase {
+ protected BrokerService broker;
+ protected String bindAddress =
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+ public void testGroupBroadcast() throws Exception {
+ final int number = 10;
+ final AtomicInteger count = new AtomicInteger();
+ List<Group> groups = new ArrayList<Group>();
+ ConnectionFactory factory = createConnectionFactory();
+ for (int i = 0; i < number; i++) {
+ Connection connection = factory.createConnection();
+ Group group = new Group(connection, "group" + i);
+ group.setMinimumGroupSize(i+1);
+ group.start();
+ groups.add(group);
+ group.addGroupMessageListener(new GroupMessageListener() {
+ public void messageDelivered(Member sender, String replyId,
+ Object message) {
+ synchronized (count) {
+ if (count.incrementAndGet() == number) {
+ count.notifyAll();
+ }
+ }
+ }
+ });
+ }
+ groups.get(0).broadcastMessage("hello");
+ synchronized (count) {
+ if (count.get() < number) {
+ count.wait(5000);
+ }
+ }
+ assertEquals(number, count.get());
+ for (Group map : groups) {
+ map.stop();
+ }
+ }
+
+ public void testsendMessage() throws Exception {
+ final int number = 10;
+ final AtomicInteger count = new AtomicInteger();
+ List<Group> groups = new ArrayList<Group>();
+ ConnectionFactory factory = createConnectionFactory();
+ for (int i = 0; i < number; i++) {
+ Connection connection = factory.createConnection();
+ Group group = new Group(connection, "group" + i);
+ group.setMinimumGroupSize(i+1);
+ group.start();
+ groups.add(group);
+ group.addGroupMessageListener(new GroupMessageListener() {
+ public void messageDelivered(Member sender, String replyId,
+ Object message) {
+ synchronized (count) {
+ count.incrementAndGet();
+ count.notifyAll();
+ }
+ }
+ });
+ }
+ groups.get(0).sendMessage("hello");
+ synchronized (count) {
+ if (count.get() == 0) {
+ count.wait(5000);
+ }
+ }
+ // wait a while to check that only one got it
+ Thread.sleep(2000);
+ assertEquals(1, count.get());
+ for (Group map : groups) {
+ map.stop();
+ }
+ }
+
+ public void testSendToSingleMember() throws Exception {
+ ConnectionFactory factory = createConnectionFactory();
+ Connection connection1 = factory.createConnection();
+ Connection connection2 = factory.createConnection();
+ Group group1 = new Group(connection1, "group1");
+ final AtomicBoolean called = new AtomicBoolean();
+ group1.addGroupMessageListener(new GroupMessageListener() {
+ public void messageDelivered(Member sender, String replyId,
+ Object message) {
+ synchronized (called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+ });
+ group1.start();
+ Group group2 = new Group(connection2, "group2");
+ group2.setMinimumGroupSize(2);
+ group2.start();
+ Member member1 = group2.getMemberByName("group1");
+ group2.sendMessage(member1, "hello");
+ synchronized (called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(called.get());
+ group1.stop();
+ group2.stop();
+ }
+
+ public void testSendRequestReply() throws Exception {
+ ConnectionFactory factory = createConnectionFactory();
+ Connection connection1 = factory.createConnection();
+ Connection connection2 = factory.createConnection();
+ final int number = 1000;
+ final AtomicInteger requestCount = new AtomicInteger();
+ final AtomicInteger replyCount = new AtomicInteger();
+ final List<String> requests = new ArrayList<String>();
+ final List<String> replies = new ArrayList<String>();
+ for (int i = 0; i < number; i++) {
+ requests.add("request" + i);
+ replies.add("reply" + i);
+ }
+ final Group group1 = new Group(connection1, "group1");
+ final AtomicBoolean finished = new AtomicBoolean();
+ group1.addGroupMessageListener(new GroupMessageListener() {
+ public void messageDelivered(Member sender, String replyId,
+ Object message) {
+ if (!replies.isEmpty()) {
+ String reply = replies.remove(0);
+ try {
+ group1.sendMessageResponse(sender, replyId, reply);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ });
+ group1.start();
+ final Group group2 = new Group(connection2, "group2");
+ group2.setMinimumGroupSize(2);
+ group2.addGroupMessageListener(new GroupMessageListener() {
+ public void messageDelivered(Member sender, String replyId,
+ Object message) {
+ if (!requests.isEmpty()) {
+ String request = requests.remove(0);
+ try {
+ group2.sendMessage(sender, request);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }else {
+ synchronized (finished) {
+ finished.set(true);
+ finished.notifyAll();
+ }
+ }
+ }
+ });
+ group2.start();
+ Member member1 = group2.getMemberByName("group1");
+ group2.sendMessage(member1, requests.remove(0));
+ synchronized (finished) {
+ if (!finished.get()) {
+ finished.wait(10000);
+ }
+ }
+ assertTrue(finished.get());
+ group1.stop();
+ group2.stop();
+ }
+
+ protected void setUp() throws Exception {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory()
+ throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+ ActiveMQConnection.DEFAULT_BROKER_URL);
+ return cf;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ configureBroker(answer);
+ answer.start();
+ return answer;
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception {
+ answer.setPersistent(false);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}
Propchange:
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupMessageTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java?rev=685612&view=auto
==============================================================================
---
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java
(added)
+++
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java
Wed Aug 13 10:05:07 2008
@@ -0,0 +1,553 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.groupmq;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.groupmq.DefaultMapChangedListener;
+import org.apache.groupmq.Group;
+import org.apache.groupmq.GroupUpdateException;
+import org.apache.groupmq.Member;
+import org.apache.groupmq.MemberChangedListener;
+
+
+public class GroupStateTest extends TestCase {
+ protected BrokerService broker;
+ protected Connection connection1;
+ protected Connection connection2;
+ protected String bindAddress =
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+ /**
+ * Test method for
+ * [EMAIL PROTECTED]
org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+ * @throws Exception
+ */
+ public void testAddMemberChangedListener() throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+ Group map1 = new Group(connection1,"map1");
+ map1.addMemberChangedListener(new MemberChangedListener(){
+
+ public void memberStarted(Member member) {
+ synchronized(counter) {
+ counter.incrementAndGet();
+ counter.notifyAll();
+ }
+
+ }
+
+ public void memberStopped(Member member) {
+ synchronized(counter) {
+ counter.decrementAndGet();
+ counter.notifyAll();
+ }
+ }
+
+ });
+ map1.start();
+ synchronized(counter) {
+ if (counter.get()<1) {
+ counter.wait(5000);
+ }
+ }
+ assertEquals(1, counter.get());
+ Group map2 = new Group(connection2,"map2");
+ map2.start();
+ synchronized(counter) {
+ if (counter.get()<2) {
+ counter.wait(5000);
+ }
+ }
+ assertEquals(2, counter.get());
+ map2.stop();
+ synchronized(counter) {
+ if (counter.get()>=2) {
+ counter.wait(Group.DEFAULT_HEART_BEAT_INTERVAL*3);
+ }
+ }
+ assertEquals(1, counter.get());
+ map1.stop();
+ }
+
+ /**
+ * Test method for
+ * [EMAIL PROTECTED]
org.apache.activemq.group.Group#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}.
+ * @throws Exception
+ */
+ public void testAddMapChangedListener() throws Exception {
+ final AtomicBoolean called1 = new AtomicBoolean();
+ final AtomicBoolean called2 = new AtomicBoolean();
+
+ Group map1 = new Group(connection1,"map1");
+
+ map1.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapInsert(Member owner,Object Key, Object Value) {
+ synchronized(called1) {
+ called1.set(true);
+ called1.notifyAll();
+ }
+ }
+ });
+ map1.start();
+
+ Group map2 = new Group(connection2,"map2");
+
+ map2.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapInsert(Member owner,Object Key, Object Value) {
+ synchronized(called2) {
+ called2.set(true);
+ called2.notifyAll();
+ }
+ }
+ });
+ map2.start();
+
+
+ map1.put("test", "blob");
+ synchronized(called1) {
+ if (!called1.get()) {
+ called1.wait(5000);
+ }
+ }
+ synchronized(called2) {
+ if (!called2.get()) {
+ called2.wait(5000);
+ }
+ }
+ assertTrue(called1.get());
+ assertTrue(called2.get());
+ map1.stop();
+ map2.stop();
+ }
+
+ public void testGetImplicitWriteLock() throws Exception {
+ Group map1 = new Group(connection1, "map1");
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.start();
+ Group map2 = new Group(connection2, "map2");
+ map2.setAlwaysLock(true);
+ map2.setMinimumGroupSize(2);
+ map2.start();
+ map2.put("test", "foo");
+ try {
+ map1.put("test", "bah");
+ fail("Should have thrown an exception!");
+ } catch (GroupUpdateException e) {
+ }
+ map1.stop();
+ map2.stop();
+ }
+
+ public void testExpireImplicitWriteLock() throws Exception {
+ Group map1 = new Group(connection1, "map1");
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.start();
+ Group map2 = new Group(connection2, "map2");
+ map2.setAlwaysLock(true);
+ map2.setLockTimeToLive(1000);
+ map2.setMinimumGroupSize(2);
+ map2.start();
+ map2.put("test", "foo");
+ try {
+ map1.put("test", "bah");
+ fail("Should have thrown an exception!");
+ } catch (GroupUpdateException e) {
+ }
+ Thread.sleep(2000);
+ map1.put("test", "bah");
+ map1.stop();
+ map2.stop();
+ }
+
+ public void XtestExpireImplicitLockOnExit() throws Exception {
+ Group map1 = new Group(connection1, "map1");
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.start();
+ Group map2 = new Group(connection2, "map2");
+ map2.setAlwaysLock(true);
+ map2.setMinimumGroupSize(2);
+ map2.start();
+ map2.put("test", "foo");
+ try {
+ map1.put("test", "bah");
+ fail("Should have thrown an exception!");
+ } catch (GroupUpdateException e) {
+ }
+ map2.stop();
+ map1.put("test", "bah");
+ map1.stop();
+
+ }
+
+ public void testGetExplicitWriteLock() throws Exception {
+ Group map1 = new Group(connection1, "map1");
+ map1.setAlwaysLock(true);
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.start();
+ Group map2 = new Group(connection2, "map2");
+ map2.setAlwaysLock(true);
+ map2.setMinimumGroupSize(2);
+ map2.start();
+ map2.put("test", "foo");
+ map2.lock("test");
+ try {
+ map1.put("test", "bah");
+ fail("Should have thrown an exception!");
+ } catch (GroupUpdateException e) {
+ }
+ map2.unlock("test");
+ map1.lock("test");
+ try {
+ map2.lock("test");
+ fail("Should have thrown an exception!");
+ } catch (GroupUpdateException e) {
+ }
+ map1.stop();
+ map2.stop();
+ }
+
+
+
+ /**
+ * Test method for [EMAIL PROTECTED]
org.apache.activemq.group.Group#clear()}.
+ *
+ * @throws Exception
+ */
+ public void testClear() throws Exception {
+ Group map1 = new Group(connection1,"map1");
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapInsert(Member owner,Object Key, Object Value) {
+ synchronized(called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+
+ public void mapRemove(Member owner, Object key, Object
value,boolean expired) {
+ synchronized(called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+ });
+ map1.start();
+ Group map2 = new Group(connection2,"map2");
+ map2.start();
+ map2.put("test","foo");
+ synchronized(called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(called.get());
+ called.set(false);
+ assertTrue(map1.isEmpty()==false);
+ map2.clear();
+ synchronized(called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(map1.isEmpty());
+ map1.stop();
+ map2.stop();
+ }
+
+ /**
+ * Test a new map is populated for existing values
+ */
+ public void testMapUpdatedOnStart() throws Exception {
+ Group map1 = new Group(connection1,"map1");
+ final AtomicBoolean called = new AtomicBoolean();
+
+ map1.start();
+ map1.put("test", "foo");
+ Group map2 = new Group(connection2,"map2");
+ map2.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapInsert(Member owner,Object Key, Object Value) {
+ synchronized(called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+ });
+ map2.start();
+
+ synchronized(called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(called.get());
+ called.set(false);
+ assertTrue(map2.containsKey("test"));
+ assertTrue(map2.containsValue("foo"));
+ map1.stop();
+ map2.stop();
+ }
+
+ public void testContainsKey() throws Exception {
+ Group map1 = new Group(connection1,"map1");
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapInsert(Member owner,Object Key, Object Value) {
+ synchronized(called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+ });
+ map1.start();
+ Group map2 = new Group(connection2,"map2");
+ map2.start();
+ map2.put("test","foo");
+ synchronized(called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(called.get());
+ called.set(false);
+ assertTrue(map1.containsKey("test"));
+ map1.stop();
+ map2.stop();
+ }
+
+
+ /**
+ * Test method for
+ * [EMAIL PROTECTED]
org.apache.activemq.group.Group#containsValue(java.lang.Object)}.
+ * @throws Exception
+ */
+ public void testContainsValue() throws Exception {
+ Group map1 = new Group(connection1,"map1");
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapInsert(Member owner,Object Key, Object Value) {
+ synchronized(called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+ });
+ map1.start();
+ Group map2 = new Group(connection2,"map2");
+ map2.start();
+ map2.put("test","foo");
+ synchronized(called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(called.get());
+ called.set(false);
+ assertTrue(map1.containsValue("foo"));
+ map1.stop();
+ map2.stop();
+ }
+
+ /**
+ * Test method for [EMAIL PROTECTED]
org.apache.activemq.group.GroupMap#entrySet()}.
+ * @throws Exception
+ */
+
+
+ /**
+ * Test method for
+ * [EMAIL PROTECTED]
org.apache.activemq.group.Group#get(java.lang.Object)}.
+ * @throws Exception
+ */
+ public void testGet() throws Exception {
+ Group map1 = new Group(connection1,"map1");
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapInsert(Member owner,Object Key, Object Value) {
+ synchronized(called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+ });
+ map1.start();
+ Group map2 = new Group(connection2,"map2");
+ map2.start();
+ map2.put("test","foo");
+ synchronized(called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(called.get());
+ assertTrue(map1.get("test").equals("foo"));
+ map1.stop();
+ map2.stop();
+ }
+
+ public void testPut() throws Exception {
+ Group map1 = new Group(connection1,"map1");
+ map1.start();
+ Group map2 = new Group(connection2,"map2");
+ map2.setMinimumGroupSize(2);
+ map2.start();
+ Object value = map1.put("foo", "blob");
+ assertNull(value);
+ value = map1.put("foo", "blah");
+ assertEquals(value, "blob");
+ map1.stop();
+ map2.stop();
+ }
+
+
+
+ /**
+ * Test method for
+ * [EMAIL PROTECTED]
org.apache.activemq.group.Group#remove(java.lang.Object)}.
+ */
+ public void testRemove() throws Exception{
+ Group map1 = new Group(connection1,"map1");
+ final AtomicBoolean called = new AtomicBoolean();
+ map1.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapInsert(Member owner,Object Key, Object Value) {
+ synchronized(called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+
+ public void mapRemove(Member owner, Object key, Object
value,boolean expired) {
+ synchronized(called) {
+ called.set(true);
+ called.notifyAll();
+ }
+ }
+ });
+ map1.start();
+ Group map2 = new Group(connection2,"map2");
+ map2.start();
+ map2.put("test","foo");
+ synchronized(called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(called.get());
+ called.set(false);
+ assertTrue(map1.isEmpty()==false);
+ map2.remove("test");
+ synchronized(called) {
+ if (!called.get()) {
+ called.wait(5000);
+ }
+ }
+ assertTrue(map1.isEmpty());
+
+ map1.stop();
+ map2.stop();
+ }
+
+ public void testExpire() throws Exception{
+ final AtomicBoolean called1 = new AtomicBoolean();
+ final AtomicBoolean called2 = new AtomicBoolean();
+
+ Group map1 = new Group(connection1,"map1");
+ map1.setTimeToLive(1000);
+ map1.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapRemove(Member owner, Object key, Object
value,boolean expired) {
+ synchronized(called1) {
+ called1.set(expired);
+ called1.notifyAll();
+ }
+ }
+ });
+ map1.start();
+
+ Group map2 = new Group(connection2,"map2");
+
+ map2.addMapChangedListener(new DefaultMapChangedListener() {
+ public void mapRemove(Member owner, Object key, Object
value,boolean expired) {
+ synchronized(called2) {
+ called2.set(expired);
+ called2.notifyAll();
+ }
+ }
+ });
+ map2.start();
+
+
+ map1.put("test", "blob");
+ synchronized(called1) {
+ if (!called1.get()) {
+ called1.wait(5000);
+ }
+ }
+ synchronized(called2) {
+ if (!called2.get()) {
+ called2.wait(5000);
+ }
+ }
+ assertTrue(called1.get());
+ assertTrue(called2.get());
+ map1.stop();
+ map2.stop();
+ }
+
+ protected void setUp() throws Exception {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ ConnectionFactory factory = createConnectionFactory();
+ connection1 = factory.createConnection();
+ connection1.start();
+ connection2 = factory.createConnection();
+ connection2.start();
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ connection1.close();
+ connection2.close();
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory()throws
Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+ ActiveMQConnection.DEFAULT_BROKER_URL);
+ return cf;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ configureBroker(answer);
+ answer.start();
+ return answer;
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception {
+ answer.setPersistent(false);
+ answer.addConnector(bindAddress);
+ answer.setDeleteAllMessagesOnStartup(true);
+ }
+}
Propchange:
activemq/sandbox/groupmq/src/test/java/org/apache/groupmq/GroupStateTest.java
------------------------------------------------------------------------------
svn:eol-style = native