Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java?rev=636615&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java (added) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/IndexThreadingTestSupport.java Wed Mar 12 20:16:19 2008 @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.index; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.broker.router.index.api.DataIndex; +import org.apache.activemq.broker.router.index.api.IndexEntry; +import org.apache.activemq.broker.router.index.api.ReferenceIndex; + +abstract public class IndexThreadingTestSupport extends IndexTestSupport { + + AtomicBoolean stopping = new AtomicBoolean(); + AtomicInteger counter = new AtomicInteger(); + + class Publisher extends Thread { + private final ReferenceIndex refStore; + private DataIndex dataIndex; + + AtomicReference<Throwable> error = new AtomicReference<Throwable>(); + AtomicReference<IndexEntry> lastIndexEntry = new AtomicReference<IndexEntry>(); + private final int id; + + public Publisher(ReferenceIndex refIndex, int id) { + super("Publisher: "+id); + this.refStore = refIndex; + this.id = id; + this.dataIndex = refIndex.getDataIndex(); + } + + @Override + public void run() { + try { + while ( !stopping.get() ) { + addRecord(); + } + } catch (Throwable e) { + e.printStackTrace(); + error.set(e); + } + } + + private void addRecord() throws Exception { + IndexEntry ie; + synchronized(this) { + long i = getNextId(); + ie = dataIndex.addMessage(i, createLocation(id, (int) i)); + } + assertNotNull(ie); + refStore.addReference(ie); + lastIndexEntry.set(ie); + counter.incrementAndGet(); + } + + } + + + long nextId; + private long getNextId() { + return nextId++; + } + + /** + * Verify that the create retrieve and delete operations work properly + * against a ReferenceStore. + * + * @throws Exception + */ + public void testReferenceStoreCRD() throws Exception { + + String storeName = getName(); + String refName = getName() + "-ref"; + DataIndex dataIndex = dataIndexManager.addStore(storeName); + ReferenceIndex refIndex = dataIndex.addStore(refName); + + // Start 2 publishers... + Publisher p1 = new Publisher(refIndex, 1); + p1.start(); + Publisher p2 = new Publisher(refIndex, 2); + p2.start(); + + long processed=0; + try { + int testDuration = 1000*5; + long start = System.currentTimeMillis(); + while( true ) { + if( System.currentTimeMillis()-start > testDuration ) { + break; + } + int published = counter.getAndSet(0); + if( published > 0 ) { + List<IndexEntry> loaded = refIndex.remove(null, null, published); + assertEquals(published, loaded.size() ); + Thread.sleep(10); + processed+=published; + } else { + Thread.sleep(100); + } + } + } finally { + stopping.set(true); + p1.join(); + p2.join(); + System.out.println("Processed: "+processed+" messages."); + } + } + +}
Copied: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java (from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java?p2=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java&p1=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java&r1=635708&r2=636615&rev=636615&view=diff ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaIndexTest.java (original) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaBasicIndexTest.java Wed Mar 12 20:16:19 2008 @@ -18,11 +18,11 @@ import java.io.File; -import org.apache.activemq.broker.router.index.IndexTestSupport; +import org.apache.activemq.broker.router.index.IndexBasicTestSupport; import org.apache.activemq.broker.router.index.api.DataIndexManager; import org.apache.activemq.broker.router.store.journal.JournalDataStoreManagerFactory; -public class JpaIndexTest extends IndexTestSupport { +public class JpaBasicIndexTest extends IndexBasicTestSupport { @Override protected DataIndexManager createDataIndexManager() throws Exception { Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java?rev=636615&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java (added) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/index/jpa/JpaThreadingIndexTest.java Wed Mar 12 20:16:19 2008 @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.index.jpa; + +import java.io.File; + +import org.apache.activemq.broker.router.index.IndexThreadingTestSupport; +import org.apache.activemq.broker.router.index.api.DataIndexManager; +import org.apache.activemq.broker.router.store.journal.JournalDataStoreManagerFactory; + +public class JpaThreadingIndexTest extends IndexThreadingTestSupport { + + @Override + protected DataIndexManager createDataIndexManager() throws Exception { + JournalDataStoreManagerFactory factory = new JournalDataStoreManagerFactory(); + factory.setDataDirectory(new File("target/data/" + getName())); +// factory.getEntityManagerProperties().put("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); + return factory.createDataIndexManager(); + } + +} Modified: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java?rev=636615&r1=636614&r2=636615&view=diff ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java (original) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/performance/QueueTests.java Wed Mar 12 20:16:19 2008 @@ -74,9 +74,9 @@ private static final long SAMPLE_DELAY = Integer.parseInt(System.getProperty("SAMPLE_DELAY", "" + 1000 * 5)); private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "10000")); private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 1)); - private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "1")); - private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "1")); - private static final int persistent = DeliveryMode.NON_PERSISTENT; + private static final int PRODUCER_COUNT = Integer.parseInt(System.getProperty("PRODUCER_COUNT", "10")); + private static final int CONSUMER_COUNT = Integer.parseInt(System.getProperty("CONSUMER_COUNT", "10")); + private static final int persistent = DeliveryMode.PERSISTENT; public ActiveMQDestination destination; Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java?rev=636615&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java (added) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreBasicTestSupport.java Wed Mar 12 20:16:19 2008 @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.broker.router.store.api.ReferenceStore; + +abstract public class StoreBasicTestSupport extends StoreTestSupport { + + /** + * Verify that the create retrieve and delete operations work properly + * against a DataStore. + * + * @throws Exception + */ + public void testDataStoreCRD() throws Exception { + String storeName = getName(); + DataStore dataStore = dataStoreManager.addStore(storeName); + dataStore.setDestination(destination); + + int count = 10; + CacheEntry ce[] = new CacheEntry[count]; + for (int i = 0; i < count; i++) { + ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: " + i), null); + assertNotNull(ce[i]); + assertNotNull(ce[i].getStore()); + assertNotNull(ce[i].getId()); + assertNotNull(ce[i].getMessage()); + } + + restartDataStoreManager(); + dataStore = dataStoreManager.getStore(storeName); + dataStore.setDestination(destination); + + assertEquals(count, dataStore.size()); + + List<CacheEntry> loaded = dataStore.load(null, null, count * 2); + assertEquals(count, loaded.size()); + + // Verify that all the entries were loaded in the order inserted. + int i = 0; + for (CacheEntry entry : loaded) { + assertNotNull(entry); + assertNotNull(entry.getStore()); + assertEquals(ce[i], entry); + i++; + } + + // Lets delete every other record. + int deleteCount = 0; + i = 0; + for (CacheEntry entry : loaded) { + if (i % 2 == 0) { + dataStore.remove(entry.getId(), null); + deleteCount++; + } + i++; + } + + // Restart and verify that the right records were removed. + restartDataStoreManager(); + dataStore = dataStoreManager.getStore(storeName); + dataStore.setDestination(destination); + + assertEquals(count - deleteCount, dataStore.size()); + + loaded = dataStore.load(null, null, count * 2); + assertEquals(count - deleteCount, loaded.size()); + + Iterator<CacheEntry> iterator = loaded.iterator(); + for (int j = 0; j < count; j++) { + if (!(i % 2 == 0)) { + CacheEntry entry = iterator.next(); + assertEquals(ce[j], entry); + } + } + + // Verify that removing a store wipes out his data. + dataStoreManager.removeStore(dataStore); + dataStore = dataStoreManager.addStore(storeName); + dataStore.setDestination(destination); + assertEquals(0, dataStore.size()); + + } + + /** + * Verify that the create retrieve and delete operations work properly + * against a ReferenceStore. + * + * @throws Exception + */ + public void testReferenceStoreCRD() throws Exception { + String storeName = getName(); + String refName = getName() + "-ref"; + DataStore dataStore = dataStoreManager.addStore(storeName); + dataStore.setDestination(destination); + ReferenceStore refStore = dataStore.addStore(refName); + + int count = 10; + CacheEntry ce[] = new CacheEntry[count]; + for (int i = 0; i < count; i++) { + ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: " + i), null); + assertNotNull(ce[i]); + assertNotNull(ce[i].getStore()); + assertNotNull(ce[i].getId()); + assertNotNull(ce[i].getMessage()); + } + + for (int i = 0; i < count; i++) { + refStore.addReference(ce[i]); + } + + restartDataStoreManager(); + + dataStore = dataStoreManager.getStore(storeName); + dataStore.setDestination(destination); + refStore = dataStore.getStore(refName); + + assertEquals(count, refStore.size()); + + List<CacheEntry> loaded = refStore.load(null, null, count * 2); + assertEquals(count, loaded.size()); + + // Verify that all the entries were loaded in the order inserted. + int i = 0; + for (CacheEntry entry : loaded) { + assertNotNull(entry); + assertNotNull(entry.getStore()); + assertEquals(ce[i], entry); + i++; + } + + // Lets delete every other record. + int deleteCount = 0; + i = 0; + for (CacheEntry entry : loaded) { + if (i % 2 == 0) { + refStore.remove(entry.getId(), null); + deleteCount++; + } + i++; + } + + // Restart and verify that the right records were removed. + restartDataStoreManager(); + dataStore = dataStoreManager.getStore(storeName); + dataStore.setDestination(destination); + refStore = dataStore.getStore(refName); + + assertEquals(count - deleteCount, refStore.size()); + + loaded = refStore.load(null, null, count * 2); + assertEquals(count - deleteCount, loaded.size()); + + Iterator<CacheEntry> iterator = loaded.iterator(); + for (int j = 0; j < count; j++) { + if (!(i % 2 == 0)) { + CacheEntry entry = iterator.next(); + assertEquals(ce[j], entry); + } + } + + // Verify that removing a store wipes out his data. + dataStore.removeStore(refStore); + refStore = dataStore.addStore(refName); + assertEquals(0, refStore.size()); + + } + + public void testCreateDestroyDataStore() throws Exception { + String storeName = getName(); + + assertNull(dataStoreManager.getStore(storeName)); + List<DataStore> storees = dataStoreManager.getStores(); + assertTrue(storees.isEmpty()); + + DataStore dataStore = dataStoreManager.addStore(storeName); + assertNotNull(dataStore); + dataStore.setDestination(destination); + + assertSame(dataStore, dataStoreManager.getStore(storeName)); + storees = dataStoreManager.getStores(); + assertEquals(1, storees.size()); + assertTrue(storees.contains(dataStore)); + + // Verify that the data store create was persisted between restart. + restartDataStoreManager(); + + dataStore = dataStoreManager.getStore(storeName); + assertNotNull(dataStore); + assertEquals(storeName, dataStore.getName()); + storees = dataStoreManager.getStores(); + assertEquals(1, storees.size()); + assertTrue(storees.contains(dataStore)); + + // Verify that the data store remove was persisted between restart. + dataStoreManager.removeStore(dataStore); + restartDataStoreManager(); + + dataStore = dataStoreManager.getStore(storeName); + assertNull(dataStore); + storees = dataStoreManager.getStores(); + assertTrue(storees.isEmpty()); + } + + public void testCreateDestroyReferenceStore() throws Exception { + + String dataName = getName(); + String refName = getName() + "-ref"; + DataStore manager = dataStoreManager.addStore(dataName); + manager.setDestination(destination); + + assertNull(manager.getStore(refName)); + List<ReferenceStore> storees = manager.getStores(); + assertTrue(storees.isEmpty()); + + ReferenceStore dataStore = manager.addStore(refName); + assertNotNull(dataStore); + + assertSame(dataStore, manager.getStore(refName)); + storees = manager.getStores(); + assertEquals(1, storees.size()); + assertTrue(storees.contains(dataStore)); + + // Verify that the data store create was persisted between restart. + restartDataStoreManager(); + manager = dataStoreManager.getStore(dataName); + + dataStore = manager.getStore(refName); + assertNotNull(dataStore); + assertEquals(refName, dataStore.getName()); + storees = manager.getStores(); + assertEquals(1, storees.size()); + assertTrue(storees.contains(dataStore)); + + // Verify that the data store remove was persisted between restart. + manager.removeStore(dataStore); + restartDataStoreManager(); + manager = dataStoreManager.getStore(dataName); + + dataStore = manager.getStore(refName); + assertNull(dataStore); + storees = manager.getStores(); + assertTrue(storees.isEmpty()); + + } + + /** + * Verify that store properties can be stored without any issues. + * + * @throws Exception + */ + public void testStoreProperties() throws Exception { + + String storeName = getName(); + + // Make a relatively large property object.. + Map<String, String> properties = new HashMap<String, String>(); + for (int i = 0; i < 1000; i++) { + properties.put("key" + i, "value" + i); + } + + DataStore dataStore = dataStoreManager.addStore(storeName); + dataStore.setDestination(destination); + dataStore.setProperties(properties); + assertEquals(properties, dataStore.getProperties()); + + // Restart and verify the the properties were preserved. + restartDataStoreManager(); + + dataStore = dataStoreManager.getStore(storeName); + assertEquals(properties, dataStore.getProperties()); + + dataStore.setProperties(null); + + String refName = storeName + "-ref"; + ReferenceStore refStore = dataStore.addStore(refName); + refStore.setProperties(properties); + + restartDataStoreManager(); + + dataStore = dataStoreManager.getStore(storeName); + assertNull(dataStore.getProperties()); + refStore = dataStore.getStore(refName); + + assertEquals(properties, refStore.getProperties()); + + } +} Modified: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java?rev=636615&r1=636614&r2=636615&view=diff ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java (original) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreTestSupport.java Wed Mar 12 20:16:19 2008 @@ -16,10 +16,7 @@ */ package org.apache.activemq.broker.router.store; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; @@ -31,7 +28,6 @@ import org.apache.activemq.broker.router.store.api.CacheEntry; import org.apache.activemq.broker.router.store.api.DataStore; import org.apache.activemq.broker.router.store.api.DataStoreManager; -import org.apache.activemq.broker.router.store.api.ReferenceStore; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ConnectionId; @@ -43,12 +39,12 @@ abstract public class StoreTestSupport extends TestCase { - DataStoreManager dataStoreManager; - private ProducerId producerId; - private int msgIdGenerator; - private ActiveMQQueue destinationName; + protected DataStoreManager dataStoreManager; + protected ProducerId producerId; + protected int msgIdGenerator; + protected ActiveMQQueue destinationName; - private Destination destination = new StubDestination() { + protected Destination destination = new StubDestination() { SystemUsage systemUsage = new SystemUsage(); @Override @@ -85,287 +81,7 @@ dataStoreManager = null; } - /** - * Verify that the create retrieve and delete operations work properly - * against a DataStore. - * - * @throws Exception - */ - public void testDataStoreCRD() throws Exception { - String storeName = getName(); - DataStore dataStore = dataStoreManager.addStore(storeName); - dataStore.setDestination(destination); - - int count = 10; - CacheEntry ce[] = new CacheEntry[count]; - for (int i = 0; i < count; i++) { - ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: " + i), null); - assertNotNull(ce[i]); - assertNotNull(ce[i].getStore()); - assertNotNull(ce[i].getId()); - assertNotNull(ce[i].getMessage()); - } - - restartDataStoreManager(); - dataStore = dataStoreManager.getStore(storeName); - dataStore.setDestination(destination); - - assertEquals(count, dataStore.size()); - - List<CacheEntry> loaded = dataStore.load(null, null, count * 2); - assertEquals(count, loaded.size()); - - // Verify that all the entries were loaded in the order inserted. - int i = 0; - for (CacheEntry entry : loaded) { - assertNotNull(entry); - assertNotNull(entry.getStore()); - assertEquals(ce[i], entry); - i++; - } - - // Lets delete every other record. - int deleteCount = 0; - i = 0; - for (CacheEntry entry : loaded) { - if (i % 2 == 0) { - dataStore.remove(entry.getId(), null); - deleteCount++; - } - i++; - } - - // Restart and verify that the right records were removed. - restartDataStoreManager(); - dataStore = dataStoreManager.getStore(storeName); - dataStore.setDestination(destination); - - assertEquals(count - deleteCount, dataStore.size()); - - loaded = dataStore.load(null, null, count * 2); - assertEquals(count - deleteCount, loaded.size()); - - Iterator<CacheEntry> iterator = loaded.iterator(); - for (int j = 0; j < count; j++) { - if (!(i % 2 == 0)) { - CacheEntry entry = iterator.next(); - assertEquals(ce[j], entry); - } - } - - // Verify that removing a store wipes out his data. - dataStoreManager.removeStore(dataStore); - dataStore = dataStoreManager.addStore(storeName); - dataStore.setDestination(destination); - assertEquals(0, dataStore.size()); - - } - - /** - * Verify that the create retrieve and delete operations work properly - * against a ReferenceStore. - * - * @throws Exception - */ - public void testReferenceStoreCRD() throws Exception { - String storeName = getName(); - String refName = getName() + "-ref"; - DataStore dataStore = dataStoreManager.addStore(storeName); - dataStore.setDestination(destination); - ReferenceStore refStore = dataStore.addStore(refName); - - int count = 10; - CacheEntry ce[] = new CacheEntry[count]; - for (int i = 0; i < count; i++) { - ce[i] = dataStore.addMessage(new Long(i), createTextMessage("Hiram's test: " + i), null); - assertNotNull(ce[i]); - assertNotNull(ce[i].getStore()); - assertNotNull(ce[i].getId()); - assertNotNull(ce[i].getMessage()); - } - - for (int i = 0; i < count; i++) { - refStore.addReference(ce[i]); - } - - restartDataStoreManager(); - - dataStore = dataStoreManager.getStore(storeName); - dataStore.setDestination(destination); - refStore = dataStore.getStore(refName); - - assertEquals(count, refStore.size()); - - List<CacheEntry> loaded = refStore.load(null, null, count * 2); - assertEquals(count, loaded.size()); - - // Verify that all the entries were loaded in the order inserted. - int i = 0; - for (CacheEntry entry : loaded) { - assertNotNull(entry); - assertNotNull(entry.getStore()); - assertEquals(ce[i], entry); - i++; - } - - // Lets delete every other record. - int deleteCount = 0; - i = 0; - for (CacheEntry entry : loaded) { - if (i % 2 == 0) { - refStore.remove(entry.getId(), null); - deleteCount++; - } - i++; - } - - // Restart and verify that the right records were removed. - restartDataStoreManager(); - dataStore = dataStoreManager.getStore(storeName); - dataStore.setDestination(destination); - refStore = dataStore.getStore(refName); - - assertEquals(count - deleteCount, refStore.size()); - - loaded = refStore.load(null, null, count * 2); - assertEquals(count - deleteCount, loaded.size()); - - Iterator<CacheEntry> iterator = loaded.iterator(); - for (int j = 0; j < count; j++) { - if (!(i % 2 == 0)) { - CacheEntry entry = iterator.next(); - assertEquals(ce[j], entry); - } - } - - // Verify that removing a store wipes out his data. - dataStore.removeStore(refStore); - refStore = dataStore.addStore(refName); - assertEquals(0, refStore.size()); - - } - - public void testCreateDestroyDataStore() throws Exception { - String storeName = getName(); - - assertNull(dataStoreManager.getStore(storeName)); - List<DataStore> storees = dataStoreManager.getStores(); - assertTrue(storees.isEmpty()); - - DataStore dataStore = dataStoreManager.addStore(storeName); - assertNotNull(dataStore); - dataStore.setDestination(destination); - - assertSame(dataStore, dataStoreManager.getStore(storeName)); - storees = dataStoreManager.getStores(); - assertEquals(1, storees.size()); - assertTrue(storees.contains(dataStore)); - - // Verify that the data store create was persisted between restart. - restartDataStoreManager(); - - dataStore = dataStoreManager.getStore(storeName); - assertNotNull(dataStore); - assertEquals(storeName, dataStore.getName()); - storees = dataStoreManager.getStores(); - assertEquals(1, storees.size()); - assertTrue(storees.contains(dataStore)); - - // Verify that the data store remove was persisted between restart. - dataStoreManager.removeStore(dataStore); - restartDataStoreManager(); - - dataStore = dataStoreManager.getStore(storeName); - assertNull(dataStore); - storees = dataStoreManager.getStores(); - assertTrue(storees.isEmpty()); - } - - public void testCreateDestroyReferenceStore() throws Exception { - - String dataName = getName(); - String refName = getName() + "-ref"; - DataStore manager = dataStoreManager.addStore(dataName); - manager.setDestination(destination); - - assertNull(manager.getStore(refName)); - List<ReferenceStore> storees = manager.getStores(); - assertTrue(storees.isEmpty()); - - ReferenceStore dataStore = manager.addStore(refName); - assertNotNull(dataStore); - - assertSame(dataStore, manager.getStore(refName)); - storees = manager.getStores(); - assertEquals(1, storees.size()); - assertTrue(storees.contains(dataStore)); - - // Verify that the data store create was persisted between restart. - restartDataStoreManager(); - manager = dataStoreManager.getStore(dataName); - - dataStore = manager.getStore(refName); - assertNotNull(dataStore); - assertEquals(refName, dataStore.getName()); - storees = manager.getStores(); - assertEquals(1, storees.size()); - assertTrue(storees.contains(dataStore)); - - // Verify that the data store remove was persisted between restart. - manager.removeStore(dataStore); - restartDataStoreManager(); - manager = dataStoreManager.getStore(dataName); - - dataStore = manager.getStore(refName); - assertNull(dataStore); - storees = manager.getStores(); - assertTrue(storees.isEmpty()); - - } - - /** - * Verify that store properties can be stored without any issues. - * - * @throws Exception - */ - public void testStoreProperties() throws Exception { - - String storeName = getName(); - - // Make a relatively large property object.. - Map<String, String> properties = new HashMap<String, String>(); - for (int i = 0; i < 1000; i++) { - properties.put("key" + i, "value" + i); - } - - DataStore dataStore = dataStoreManager.addStore(storeName); - dataStore.setDestination(destination); - dataStore.setProperties(properties); - assertEquals(properties, dataStore.getProperties()); - - // Restart and verify the the properties were preserved. - restartDataStoreManager(); - - dataStore = dataStoreManager.getStore(storeName); - assertEquals(properties, dataStore.getProperties()); - - dataStore.setProperties(null); - - String refName = storeName + "-ref"; - ReferenceStore refStore = dataStore.addStore(refName); - refStore.setProperties(properties); - - restartDataStoreManager(); - - dataStore = dataStoreManager.getStore(storeName); - assertNull(dataStore.getProperties()); - refStore = dataStore.getStore(refName); - - assertEquals(properties, refStore.getProperties()); - - } - - private Message createTextMessage(String text) throws MessageNotWriteableException { + protected Message createTextMessage(String text) throws MessageNotWriteableException { ActiveMQTextMessage message = new ActiveMQTextMessage(); message.setMessageId(new MessageId(producerId, ++msgIdGenerator)); message.setDestination(destinationName); Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java?rev=636615&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java (added) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/StoreThreadingTestSupport.java Wed Mar 12 20:16:19 2008 @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.activemq.broker.router.store.api.CacheEntry; +import org.apache.activemq.broker.router.store.api.DataStore; +import org.apache.activemq.broker.router.store.api.ReferenceStore; + +/** + * These are more complex tests that stress the Store API to see if the implementation + * is thread safe. + * + * @author chirino + */ +abstract public class StoreThreadingTestSupport extends StoreTestSupport { + + AtomicBoolean stopping = new AtomicBoolean(); + AtomicInteger counter = new AtomicInteger(); + + class Publisher extends Thread { + private final ReferenceStore refStore; + private DataStore dataStore; + + AtomicReference<Throwable> error = new AtomicReference<Throwable>(); + AtomicReference<CacheEntry> lastCacheEntry = new AtomicReference<CacheEntry>(); + private final int id; + + public Publisher(ReferenceStore refStore, int id) { + super("Publisher: "+id); + this.refStore = refStore; + this.id = id; + this.dataStore = refStore.getDataStore(); + } + + @Override + public void run() { + try { + while ( !stopping.get() ) { + CacheEntry ce; + synchronized(this) { + long i = getNextId(); + ce = dataStore.addMessage(new Long(i), createTextMessage("Publisher: "+id+", message: " + i), null); + } + assertNotNull(ce); + assertNotNull(ce.getStore()); + assertNotNull(ce.getId()); + assertNotNull(ce.getMessage()); + refStore.addReference(ce); + lastCacheEntry.set(ce); + counter.incrementAndGet(); + } + } catch (Throwable e) { + e.printStackTrace(); + error.set(e); + } + } + + } + + + long nextId; + private long getNextId() { + return nextId++; + } + + /** + * Verify that the create retrieve and delete operations work properly + * against a ReferenceStore. + * + * @throws Exception + */ + public void testReferenceStoreCRD() throws Exception { + String storeName = getName(); + String refName = getName() + "-ref"; + DataStore dataStore = dataStoreManager.addStore(storeName); + dataStore.setDestination(destination); + ReferenceStore refStore = dataStore.addStore(refName); + + // Start 2 publishers... + Publisher p1 = new Publisher(refStore, 1); + p1.start(); + Publisher p2 = new Publisher(refStore, 2); + p2.start(); + + long processed=0; + try { + int testDuration = 1000*5; + long start = System.currentTimeMillis(); + while( true ) { + if( System.currentTimeMillis()-start > testDuration ) { + break; + } + int published = counter.getAndSet(0); + if( published > 0 ) { + List<CacheEntry> loaded = refStore.remove(null, null, published); + assertEquals(published, loaded.size() ); + Thread.sleep(10); + processed+=published; + } else { + Thread.sleep(100); + } + } + } finally { + stopping.set(true); + p1.join(); + p2.join(); + System.out.println("Processed: "+processed+" messages."); + } + } + +} Copied: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java (from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java?p2=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java&p1=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java&r1=635708&r2=636615&rev=636615&view=diff ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalStoreTest.java (original) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalBasicStoreTest.java Wed Mar 12 20:16:19 2008 @@ -18,10 +18,10 @@ import java.io.File; -import org.apache.activemq.broker.router.store.StoreTestSupport; +import org.apache.activemq.broker.router.store.StoreBasicTestSupport; import org.apache.activemq.broker.router.store.api.DataStoreManager; -public class JournalStoreTest extends StoreTestSupport { +public class JournalBasicStoreTest extends StoreBasicTestSupport { @Override protected DataStoreManager createDataStoreManager() throws Exception { Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java?rev=636615&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java (added) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/journal/JournalThreadingStoreTest.java Wed Mar 12 20:16:19 2008 @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store.journal; + +import java.io.File; + +import org.apache.activemq.broker.router.store.StoreThreadingTestSupport; +import org.apache.activemq.broker.router.store.api.DataStoreManager; + +public class JournalThreadingStoreTest extends StoreThreadingTestSupport { + + @Override + protected DataStoreManager createDataStoreManager() throws Exception { + JournalDataStoreManagerFactory factory = new JournalDataStoreManagerFactory(); + factory.setDataDirectory(new File("target/data/" + getName())); +// factory.getEntityManagerProperties().put("openjpa.Log", "DefaultLevel=WARN,SQL=TRACE"); + return factory.createJournalDataStoreManager(); + } + +} Copied: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java (from r635708, activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java) URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java?p2=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java&p1=activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java&r1=635708&r2=636615&rev=636615&view=diff ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreTest.java (original) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreBasicTest.java Wed Mar 12 20:16:19 2008 @@ -16,13 +16,13 @@ */ package org.apache.activemq.broker.router.store.memory; -import org.apache.activemq.broker.router.store.StoreTestSupport; +import org.apache.activemq.broker.router.store.StoreBasicTestSupport; import org.apache.activemq.broker.router.store.api.DataStoreManager; /** * @author chirino */ -public class MemoryStoreTest extends StoreTestSupport { +public class MemoryStoreBasicTest extends StoreBasicTestSupport { @Override protected DataStoreManager createDataStoreManager() throws Exception { Added: activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java?rev=636615&view=auto ============================================================================== --- activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java (added) +++ activemq/sandbox/activemq-router/src/test/java/org/apache/activemq/broker/router/store/memory/MemoryStoreThreadingTest.java Wed Mar 12 20:16:19 2008 @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.router.store.memory; + +import org.apache.activemq.broker.router.store.StoreThreadingTestSupport; +import org.apache.activemq.broker.router.store.api.DataStoreManager; + +/** + * @author chirino + */ +public class MemoryStoreThreadingTest extends StoreThreadingTestSupport { + + @Override + protected DataStoreManager createDataStoreManager() throws Exception { + MemoryDataStoreManager rc = new MemoryDataStoreManager(); + return rc; + } + + @Override + protected void restartDataStoreManager() throws Exception { + // Sorry we don't really support restarts.. + } + +}
