Author: gtully Date: Thu Feb 5 10:06:39 2009 New Revision: 741061 URL: http://svn.apache.org/viewvc?rev=741061&view=rev Log: store specific tests for setBatch http://issues.apache.org/activemq/browse/AMQ-2020
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java (with props) activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java (with props) Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=741061&r1=741060&r2=741061&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Thu Feb 5 10:06:39 2009 @@ -158,10 +158,10 @@ lastCachedId = node.getMessageId(); } else { if (cacheEnabled) { + cacheEnabled=false; // sync with store on disabling the cache setBatch(lastCachedId); } - cacheEnabled=false; } size++; } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Thu Feb 5 10:06:39 2009 @@ -249,4 +249,9 @@ } + @Override + public void setBatch(MessageId messageId) { + lastMessageId.set(messageId.getBrokerSequenceId()); + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=741061&r1=741060&r2=741061&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java Thu Feb 5 10:06:39 2009 @@ -720,7 +720,7 @@ s = c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement()); s.setMaxRows(maxReturned * 2); s.setString(1, destination.getQualifiedName()); - s.setLong(2, nextSeq - maxReturned); + s.setLong(2, nextSeq); rs = s.executeQuery(); int count = 0; if (this.statements.isUseExternalMessageReferences()) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java Thu Feb 5 10:06:39 2009 @@ -411,4 +411,10 @@ } + @Override + public void setBatch(MessageId messageId) { + peristenceAdapter.checkpoint(true, true); + longTermStore.setBatch(messageId); + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java Thu Feb 5 10:06:39 2009 @@ -175,4 +175,10 @@ public boolean isSupportForCursors() { return true; } + + @Override + public void setBatch(MessageId messageId) { + batchEntry = messageContainer.getEntry(messageId); + } + } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=741061&r1=741060&r2=741061&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Thu Feb 5 10:06:39 2009 @@ -243,6 +243,11 @@ cursorPos=0; } + + @Override + public void setBatch(MessageId messageId) { + } + public void setMemoryUsage(MemoryUsage memoeyUSage) { } public void start() throws Exception { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java?rev=741061&r1=741060&r2=741061&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java Thu Feb 5 10:06:39 2009 @@ -150,4 +150,10 @@ public void resetBatching() { lastBatchId = null; } + + @Override + public void setBatch(MessageId messageId) { + lastBatchId = messageId; + } + } Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java?rev=741061&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java Thu Feb 5 10:06:39 2009 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorJDBCNoDuplicateTest extends StoreQueueCursorNoDuplicateTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + PersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(); + broker.setPersistenceAdapter(persistenceAdapter); + return broker; + } + +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java?rev=741061&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java Thu Feb 5 10:06:39 2009 @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import java.io.File; + +import org.apache.activeio.journal.active.JournalImpl; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.journal.JournalPersistenceAdapter; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorJournalNoDuplicateTest extends StoreQueueCursorNoDuplicateTest { + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + + File dataFileDir = new File("target/activemq-data/StoreQueueCursorJournalNoDuplicateTest"); + File journalDir = new File(dataFileDir, "journal").getCanonicalFile(); + JournalImpl journal = new JournalImpl(journalDir, 3, 1024 * 1024 * 20); + + KahaPersistenceAdapter kahaAdaptor = new KahaPersistenceAdapter(); + kahaAdaptor.setDirectory(dataFileDir); + JournalPersistenceAdapter journalAdaptor = new JournalPersistenceAdapter(journal, kahaAdaptor, broker.getTaskRunnerFactory()); + journalAdaptor.setMaxCheckpointWorkers(1); + + broker.setPersistenceAdapter(journalAdaptor); + return broker; + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJournalNoDuplicateTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java?rev=741061&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java Thu Feb 5 10:06:39 2009 @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.BrokerService; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorMemoryNoDuplicateTest extends StoreQueueCursorNoDuplicateTest { + + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setPersistent(false); + return broker; + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java?rev=741061&view=auto ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java (added) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java Thu Feb 5 10:06:39 2009 @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.region.cursors; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.usage.SystemUsage; + +/** + * @author gtully + * @see https://issues.apache.org/activemq/browse/AMQ-2020 + **/ +public class StoreQueueCursorNoDuplicateTest extends TestCase { + ActiveMQQueue destination = new ActiveMQQueue("queue-" + + StoreQueueCursorNoDuplicateTest.class.getSimpleName()); + BrokerService brokerService; + + final static String mesageIdRoot = "11111:22222:"; + final int messageBytesSize = 1024; + final String text = new String(new byte[messageBytesSize]); + + protected int count = 6; + + public void setUp() throws Exception { + brokerService = createBroker(); + brokerService.setUseJmx(false); + brokerService.deleteAllMessages(); + brokerService.start(); + } + + protected BrokerService createBroker() throws Exception { + return new BrokerService(); + } + + public void tearDown() throws Exception { + brokerService.stop(); + } + + public void testNoDuplicateAfterCacheFullAndReadPast() throws Exception { + final PersistenceAdapter persistenceAdapter = brokerService + .getPersistenceAdapter(); + final MessageStore queueMessageStore = persistenceAdapter + .createQueueMessageStore(destination); + final ConsumerInfo consumerInfo = new ConsumerInfo(); + final DestinationStatistics destinationStatistics = new DestinationStatistics(); + consumerInfo.setExclusive(true); + + final Queue queue = new Queue(brokerService, destination, + queueMessageStore, destinationStatistics, null); + + queueMessageStore.start(); + + QueueStorePrefetch underTest = new QueueStorePrefetch(queue); + SystemUsage systemUsage = new SystemUsage(); + // ensure memory limit is reached + systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2)); + underTest.setSystemUsage(systemUsage); + underTest.setEnableAudit(false); + underTest.start(); + + final ConnectionContext contextNotInTx = new ConnectionContext(); + for (int i = 0; i < count; i++) { + ActiveMQTextMessage msg = getMessage(i); + msg.setMemoryUsage(systemUsage.getMemoryUsage()); + + queueMessageStore.addMessage(contextNotInTx, msg); + underTest.addMessageLast(msg); + } + + int dequeueCount = 0; + + underTest.setMaxBatchSize(2); + underTest.reset(); + while (underTest.hasNext() && dequeueCount < count) { + MessageReference ref = underTest.next(); + underTest.remove(); + assertEquals(dequeueCount++, ref.getMessageId() + .getProducerSequenceId()); + } + underTest.release(); + assertEquals(count, dequeueCount); + } + + private ActiveMQTextMessage getMessage(int i) throws Exception { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + MessageId id = new MessageId(mesageIdRoot + i); + id.setBrokerSequenceId(i); + id.setProducerSequenceId(i); + message.setMessageId(id); + message.setDestination(destination); + message.setPersistent(true); + message.setResponseRequired(true); + message.setText("Msg:" + i + " " + text); + assertEquals(message.getMessageId().getProducerSequenceId(), i); + return message; + } +} Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date