Author: gtully
Date: Thu Oct 21 10:58:51 2010
New Revision: 1025939
URL: http://svn.apache.org/viewvc?rev=1025939&view=rev
Log:
jdbc variant of https://issues.apache.org/activemq/browse/AMQ-2985 - jdbc store
cannot ack out of order, the cleanup task query needed to be priority aware.
fix for https://issues.apache.org/activemq/browse/AMQ-2980 JDBC - priority
needed to be considered in count and recovery, additional tests included
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
activemq/trunk/activemq-core/src/test/resources/log4j.properties
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
Thu Oct 21 10:58:51 2010
@@ -78,6 +78,7 @@ public class DurableTopicSubscription ex
*/
public void unmatched(MessageReference node) throws IOException {
MessageAck ack = new MessageAck();
+ ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
ack.setMessageID(node.getMessageId());
node.getRegionDestination().acknowledge(this.getContext(), this, ack,
node);
}
@@ -111,14 +112,14 @@ public class DurableTopicSubscription ex
public void activate(SystemUsage memoryManager, ConnectionContext context,
ConsumerInfo info) throws Exception {
- LOG.debug("Activating " + this);
if (!active) {
this.active = true;
this.context = context;
this.info = info;
+ LOG.debug("Activating " + this);
int prefetch = info.getPrefetchSize();
if (prefetch>0) {
- prefetch += prefetch/2;
+ prefetch += prefetch/2;
}
int depth = Math.max(prefetch, this.pending.getMaxAuditDepth());
this.pending.setMaxAuditDepth(depth);
@@ -150,7 +151,7 @@ public class DurableTopicSubscription ex
}
public void deactivate(boolean keepDurableSubsActive) throws Exception {
- LOG.debug("Dectivating " + this);
+ LOG.debug("Deactivating " + this);
active = false;
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) {
@@ -198,7 +199,7 @@ public class DurableTopicSubscription ex
}
prefetchExtension = 0;
}
-
+
protected MessageDispatch createMessageDispatch(MessageReference node,
Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Thu Oct 21 10:58:51 2010
@@ -502,7 +502,7 @@ public class Topic extends BaseDestinati
if (topicStore != null && node.isPersistent()) {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
SubscriptionKey key = dsub.getSubscriptionKey();
- topicStore.acknowledge(context, key.getClientId(),
key.getSubscriptionName(), node.getMessageId());
+ topicStore.acknowledge(context, key.getClientId(),
key.getSubscriptionName(), node.getMessageId(), ack);
}
messageConsumed(context, node);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Oct 21 10:58:51 2010
@@ -248,6 +248,10 @@ public abstract class AbstractStoreCurso
protected final synchronized void fillBatch() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("fillBatch - batchResetNeeded=" + batchResetNeeded
+ + ", hasMessages=" + this.storeHasMessages + ", size=" +
this.size);
+ }
if (batchResetNeeded) {
resetBatch();
this.batchResetNeeded = false;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
Thu Oct 21 10:58:51 2010
@@ -58,7 +58,12 @@ public class MessageAck extends BaseComm
* The ack case where a client wants only an individual message to be
discarded.
*/
public static final byte INDIVIDUAL_ACK_TYPE = 4;
-
+
+/**
+ * The ack case where a durable topic subscription does not match a
selector.
+ */
+ public static final byte UNMATCHED_ACK_TYPE = 5;
+
protected byte ackType;
protected ConsumerId consumerId;
protected MessageId firstMessageId;
@@ -118,6 +123,10 @@ public class MessageAck extends BaseComm
return ackType == INDIVIDUAL_ACK_TYPE;
}
+ public boolean isUnmatchedAck() {
+ return ackType == UNMATCHED_ACK_TYPE;
+ }
+
/**
* @openwire:property version=1 cache=true
*/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Thu Oct 21 10:58:51 2010
@@ -74,8 +74,8 @@ public class ProxyTopicMessageStore impl
}
public void acknowledge(ConnectionContext context, String clientId, String
subscriptionName,
- MessageId messageId) throws IOException {
- delegate.acknowledge(context, clientId, subscriptionName, messageId);
+ MessageId messageId, MessageAck ack) throws
IOException {
+ delegate.acknowledge(context, clientId, subscriptionName, messageId,
ack);
}
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean
retroactive) throws IOException {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
Thu Oct 21 10:58:51 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.store;
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
@@ -39,7 +40,7 @@ public interface TopicMessageStore exten
* @param subscriptionPersistentId
* @throws IOException
*/
- void acknowledge(ConnectionContext context, String clientId, String
subscriptionName, MessageId messageId) throws IOException;
+ void acknowledge(ConnectionContext context, String clientId, String
subscriptionName, MessageId messageId, MessageAck ack) throws IOException;
/**
* @param clientId
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
Thu Oct 21 10:58:51 2010
@@ -79,7 +79,8 @@ public class AMQTopicMessageStore extend
/**
*/
- public void acknowledge(final ConnectionContext context, final String
clientId, final String subscriptionName, final MessageId messageId) throws
IOException {
+ public void acknowledge(final ConnectionContext context, final String
clientId, final String subscriptionName,
+ final MessageId messageId, final MessageAck
originalAck) throws IOException {
final boolean debug = LOG.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
ack.setDestination(destination);
@@ -140,7 +141,7 @@ public class AMQTopicMessageStore extend
try {
SubscriptionInfo sub =
topicReferenceStore.lookupSubscription(clientId, subscritionName);
if (sub != null) {
- topicReferenceStore.acknowledge(context, clientId,
subscritionName, messageId);
+ topicReferenceStore.acknowledge(context, clientId,
subscritionName, messageId, null);
return true;
}
} catch (Throwable e) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Thu Oct 21 10:58:51 2010
@@ -83,9 +83,9 @@ public interface JDBCAdapter {
int doGetMessageCount(TransactionContext c, ActiveMQDestination
destination) throws SQLException, IOException;
- void doRecoverNextMessages(TransactionContext c, ActiveMQDestination
destination, long nextSeq, int maxReturned, JDBCMessageRecoveryListener
listener) throws Exception;
+ void doRecoverNextMessages(TransactionContext c, ActiveMQDestination
destination, long nextSeq, long priority, int maxReturned,
JDBCMessageRecoveryListener listener) throws Exception;
- long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
ActiveMQDestination destination, String clientId, String subscriberName) throws
SQLException, IOException;
+ long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
ActiveMQDestination destination, String clientId, String subscriberName) throws
SQLException, IOException;
void doMessageIdScan(TransactionContext c, int limit,
JDBCMessageIdScanListener listener) throws SQLException, IOException;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Thu Oct 21 10:58:51 2010
@@ -44,7 +44,8 @@ public class JDBCMessageStore extends Ab
protected final WireFormat wireFormat;
protected final JDBCAdapter adapter;
protected final JDBCPersistenceAdapter persistenceAdapter;
- protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
+ protected AtomicLong lastRecoveredSequenceId = new AtomicLong(-1);
+ protected AtomicLong lastRecoveredPriority = new AtomicLong(Byte.MAX_VALUE
-1);
protected ActiveMQMessageAudit audit;
@@ -144,7 +145,7 @@ public class JDBCMessageStore extends Ab
public void removeMessage(ConnectionContext context, MessageAck ack)
throws IOException {
- long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId());
+ long seq = getStoreSequenceIdForMessageId(ack.getLastMessageId())[0];
// Get a connection and remove the message from the DB
TransactionContext c =
persistenceAdapter.getTransactionContext(context);
@@ -225,14 +226,15 @@ public class JDBCMessageStore extends Ab
public void recoverNextMessages(int maxReturned, final
MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
- adapter.doRecoverNextMessages(c, destination,
lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() {
+ adapter.doRecoverNextMessages(c, destination,
lastRecoveredSequenceId.get(), lastRecoveredPriority.get(), maxReturned, new
JDBCMessageRecoveryListener() {
public boolean recoverMessage(long sequenceId, byte[] data)
throws Exception {
if (listener.hasSpace()) {
Message msg = (Message)wireFormat.unmarshal(new
ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
listener.recoverMessage(msg);
- lastStoreSequenceId.set(sequenceId);
+ lastRecoveredSequenceId.set(sequenceId);
+ lastRecoveredPriority.set(msg.getPriority());
return true;
}
return false;
@@ -259,32 +261,35 @@ public class JDBCMessageStore extends Ab
* @see org.apache.activemq.store.MessageStore#resetBatching()
*/
public void resetBatching() {
- if (LOG.isDebugEnabled()) {
- LOG.debug(destination.getPhysicalName() + " resetBatch, existing
last seqId: " + lastStoreSequenceId.get());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(destination.getPhysicalName() + " resetBatching,
existing last recovered seqId: " + lastRecoveredSequenceId.get());
}
- lastStoreSequenceId.set(-1);
+ lastRecoveredSequenceId.set(-1);
+ lastRecoveredPriority.set(Byte.MAX_VALUE - 1);
}
@Override
public void setBatch(MessageId messageId) {
- long storeSequenceId = -1;
try {
- storeSequenceId = getStoreSequenceIdForMessageId(messageId);
+ long[] storedValues = getStoreSequenceIdForMessageId(messageId);
+ lastRecoveredSequenceId.set(storedValues[0]);
+ lastRecoveredPriority.set(storedValues[1]);
} catch (IOException ignoredAsAlreadyLogged) {
- // reset batch in effect with default -1 value
+ lastRecoveredSequenceId.set(-1);
+ lastRecoveredPriority.set(Byte.MAX_VALUE -1);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(destination.getPhysicalName() + " setBatch: new
sequenceId: " + storeSequenceId + ",existing last seqId: " +
lastStoreSequenceId.get());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(destination.getPhysicalName() + " setBatch: new
sequenceId: " + lastRecoveredSequenceId.get()
+ + ", priority: " + lastRecoveredPriority.get());
}
- lastStoreSequenceId.set(storeSequenceId);
}
- private long getStoreSequenceIdForMessageId(MessageId messageId) throws
IOException {
- long result = -1;
+ private long[] getStoreSequenceIdForMessageId(MessageId messageId) throws
IOException {
+ long[] result = new long[]{-1, Byte.MAX_VALUE -1};
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
- result = adapter.getStoreSequenceId(c, destination, messageId)[0];
+ result = adapter.getStoreSequenceId(c, destination, messageId);
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to get store sequenceId
for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Thu Oct 21 10:58:51 2010
@@ -527,6 +527,7 @@ public class JDBCPersistenceAdapter exte
getAdapter().doDropTables(c);
getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
getAdapter().doCreateTables(c);
+ LOG.info("Persistence store purged.");
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create(e);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Thu Oct 21 10:58:51 2010
@@ -26,6 +26,7 @@ import org.apache.activemq.ActiveMQMessa
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
@@ -33,12 +34,15 @@ import org.apache.activemq.store.TopicMe
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.6 $
*/
public class JDBCTopicMessageStore extends JDBCMessageStore implements
TopicMessageStore {
+ private static final Log LOG =
LogFactory.getLog(JDBCTopicMessageStore.class);
private Map<String, AtomicLong> subscriberLastMessageMap = new
ConcurrentHashMap<String, AtomicLong>();
private Map<String, AtomicLong> subscriberLastPriorityMap = new
ConcurrentHashMap<String, AtomicLong>();
@@ -46,12 +50,21 @@ public class JDBCTopicMessageStore exten
super(persistenceAdapter, adapter, wireFormat, topic, audit);
}
- public void acknowledge(ConnectionContext context, String clientId, String
subscriptionName, MessageId messageId) throws IOException {
+ public void acknowledge(ConnectionContext context, String clientId, String
subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
+ if (ack != null && ack.isUnmatchedAck()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ignoring unmatched selector ack for: " + messageId
+ ", cleanup will get to this message after subsequent acks.");
+ }
+ return;
+ }
// Get a connection and insert the message into the DB.
TransactionContext c =
persistenceAdapter.getTransactionContext(context);
try {
long[] res = adapter.getStoreSequenceId(c, destination,
messageId);
adapter.doSetLastAck(c, destination, clientId, subscriptionName,
res[0], res[1]);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("ack - seq: " + res[0] + ", priority: " + res[1]);
+ }
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to store acknowledgment
for: " + clientId + " on message " + messageId + " in container: " + e, e);
@@ -93,12 +106,15 @@ public class JDBCTopicMessageStore exten
AtomicLong last = subscriberLastMessageMap.get(subcriberId);
AtomicLong priority = subscriberLastPriorityMap.get(subcriberId);
if (last == null) {
- long lastAcked =
adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId,
subscriptionName);
- last = new AtomicLong(lastAcked);
+ long[] lastAcked =
adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId,
subscriptionName);
+ last = new AtomicLong(lastAcked[0]);
subscriberLastMessageMap.put(subcriberId, last);
- priority = new AtomicLong(Byte.MAX_VALUE - 1);
+ priority = new AtomicLong(lastAcked[1]);
subscriberLastMessageMap.put(subcriberId, priority);
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("recoverNextMessage - last: " + last.get() + ",
priority: " + priority);
+ }
final AtomicLong finalLast = last;
final AtomicLong finalPriority = priority;
try {
@@ -137,10 +153,6 @@ public class JDBCTopicMessageStore exten
subscriberLastPriorityMap.remove(subcriberId);
}
- /**
- * @see
org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
- * boolean)
- */
public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean
retroactive) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@@ -207,6 +219,9 @@ public class JDBCTopicMessageStore exten
} finally {
c.close();
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(clientId + ":" + subscriberName + ", messageCount: " +
result);
+ }
return result;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Thu Oct 21 10:58:51 2010
@@ -307,7 +307,7 @@ public class Statements {
+ getFullAckTableName()
+ " D "
+ " WHERE D.CONTAINER=?
AND D.CLIENT_ID=? AND D.SUB_NAME=?"
- + " AND
M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
+ + " AND
M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID AND M.PRIORITY <=
D.PRIORITY";
}
return durableSubscriberMessageCountStatement;
}
@@ -336,11 +336,17 @@ public class Statements {
public String getDeleteOldMessagesStatement() {
if (deleteOldMessagesStatement == null) {
deleteOldMessagesStatement = "DELETE FROM " +
getFullMessageTableName()
- + " WHERE ( EXPIRATION<>0 AND
EXPIRATION<?) OR ID < "
- + "( SELECT min(" +
getFullAckTableName() + ".LAST_ACKED_ID) "
- + "FROM " + getFullAckTableName() + "
WHERE "
- + getFullAckTableName() +
".CONTAINER=" + getFullMessageTableName()
- + ".CONTAINER)";
+ + " WHERE ( EXPIRATION<>0 AND
EXPIRATION<?)"
+ + " OR (ID < "
+ + " ( SELECT min(" +
getFullAckTableName() + ".LAST_ACKED_ID)"
+ + " FROM " +
getFullAckTableName() + " WHERE "
+ + getFullAckTableName() +
".CONTAINER="
+ + getFullMessageTableName()
+ ".CONTAINER )"
+ + " AND PRIORITY >= "
+ + " ( SELECT min(" +
getFullAckTableName() + ".PRIORITY) "
+ + " FROM " +
getFullAckTableName() + " WHERE "
+ + getFullAckTableName() +
".CONTAINER="
+ + getFullMessageTableName() +
".CONTAINER ))";
}
return deleteOldMessagesStatement;
}
@@ -391,7 +397,9 @@ public class Statements {
public String getFindNextMessagesByPriorityStatement() {
if (findNextMessagesByPriorityStatement == null) {
findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " +
getFullMessageTableName()
- + " WHERE CONTAINER=? ORDER BY
PRIORITY DESC, ID";
+ + " WHERE CONTAINER=?"
+ + " AND ((ID > ? AND PRIORITY = ?) OR
PRIORITY < ?)"
+ + " ORDER BY PRIORITY DESC, ID";
}
return findNextMessagesByPriorityStatement;
}
@@ -401,9 +409,11 @@ public class Statements {
*/
public String getLastAckedDurableSubscriberMessageStatement() {
if (lastAckedDurableSubscriberMessageStatement == null) {
- lastAckedDurableSubscriberMessageStatement = "SELECT
MAX(LAST_ACKED_ID) FROM "
+ lastAckedDurableSubscriberMessageStatement = "SELECT
MAX(LAST_ACKED_ID), PRIORITY FROM "
+
getFullAckTableName()
- + " WHERE CONTAINER=?
AND CLIENT_ID=? AND SUB_NAME=?";
+ + " WHERE CONTAINER=?
AND CLIENT_ID=? AND SUB_NAME=?"
+ + " GROUP BY PRIORITY"
+ + " ORDER BY PRIORITY
ASC";
}
return lastAckedDurableSubscriberMessageStatement;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Thu Oct 21 10:58:51 2010
@@ -508,8 +508,6 @@ public class DefaultJDBCAdapter implemen
* @param retroactive
* @throws SQLException
* @throws IOException
- * @see
org.apache.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection,
java.lang.Object,
- * org.apache.activemq.service.SubscriptionInfo)
*/
public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo
info, boolean retroactive)
throws SQLException, IOException {
@@ -644,11 +642,11 @@ public class DefaultJDBCAdapter implemen
}
}
- public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
ActiveMQDestination destination,
+ public long[] doGetLastAckedDurableSubscriberMessageId(TransactionContext
c, ActiveMQDestination destination,
String clientId, String subscriberName) throws SQLException,
IOException {
PreparedStatement s = null;
ResultSet rs = null;
- long result = -1;
+ long[] result = new long[]{-1, Byte.MAX_VALUE - 1};
try {
s =
c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement());
s.setString(1, destination.getQualifiedName());
@@ -656,7 +654,8 @@ public class DefaultJDBCAdapter implemen
s.setString(3, subscriberName);
rs = s.executeQuery();
if (rs.next()) {
- result = rs.getLong(1);
+ result[0] = rs.getLong(1);
+ result[1] = rs.getLong(2);
}
rs.close();
s.close();
@@ -784,7 +783,7 @@ public class DefaultJDBCAdapter implemen
}
public void doRecoverNextMessages(TransactionContext c,
ActiveMQDestination destination, long nextSeq,
- int maxReturned, JDBCMessageRecoveryListener listener) throws
Exception {
+ long priority, int maxReturned, JDBCMessageRecoveryListener
listener) throws Exception {
PreparedStatement s = null;
ResultSet rs = null;
try {
@@ -795,8 +794,10 @@ public class DefaultJDBCAdapter implemen
}
s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName());
- if (!isPrioritizedMessages()) {
- s.setLong(2, nextSeq);
+ s.setLong(2, nextSeq);
+ if (isPrioritizedMessages()) {
+ s.setLong(3, priority);
+ s.setLong(4, priority);
}
rs = s.executeQuery();
int count = 0;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
Thu Oct 21 10:58:51 2010
@@ -25,6 +25,7 @@ import org.apache.activemq.broker.Connec
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.JournalTopicAck;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
@@ -82,7 +83,7 @@ public class JournalTopicMessageStore ex
/**
*/
public void acknowledge(ConnectionContext context, String clientId, String
subscriptionName,
- final MessageId messageId) throws IOException {
+ final MessageId messageId, MessageAck originalAck)
throws IOException {
final boolean debug = LOG.isDebugEnabled();
JournalTopicAck ack = new JournalTopicAck();
@@ -138,7 +139,7 @@ public class JournalTopicMessageStore ex
try {
SubscriptionInfo sub = longTermStore.lookupSubscription(clientId,
subscritionName);
if (sub != null) {
- longTermStore.acknowledge(context, clientId, subscritionName,
messageId);
+ longTermStore.acknowledge(context, clientId, subscritionName,
messageId, null);
}
} catch (Throwable e) {
LOG.debug("Could not replay acknowledge for message '" + messageId
@@ -177,7 +178,7 @@ public class JournalTopicMessageStore ex
SubscriptionKey subscriptionKey = iterator.next();
MessageId identity =
cpAckedLastAckLocations.get(subscriptionKey);
longTermStore.acknowledge(transactionTemplate.getContext(),
subscriptionKey.clientId,
-
subscriptionKey.subscriptionName, identity);
+
subscriptionKey.subscriptionName, identity, null);
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
Thu Oct 21 10:58:51 2010
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
@@ -79,7 +80,7 @@ public class KahaTopicMessageStore exten
}
public synchronized void acknowledge(ConnectionContext context, String
clientId, String subscriptionName,
- MessageId messageId) throws
IOException {
+ MessageId messageId, MessageAck ack)
throws IOException {
String subcriberId = getSubscriptionKey(clientId, subscriptionName);
TopicSubContainer container = subscriberMessages.get(subcriberId);
if (container != null) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
Thu Oct 21 10:58:51 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHa
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
@@ -224,7 +225,7 @@ public class KahaTopicReferenceStore ext
}
public void acknowledge(ConnectionContext context,
- String clientId, String subscriptionName, MessageId
messageId) throws IOException {
+ String clientId, String subscriptionName, MessageId
messageId, MessageAck ack) throws IOException {
acknowledgeReference(context, clientId, subscriptionName,
messageId);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Thu Oct 21 10:58:51 2010
@@ -611,7 +611,8 @@ public class KahaDBStore extends Message
}
}
- public void acknowledge(ConnectionContext context, String clientId,
String subscriptionName, MessageId messageId)
+ public void acknowledge(ConnectionContext context, String clientId,
String subscriptionName,
+ MessageId messageId, MessageAck ack)
throws IOException {
String subscriptionKey = subscriptionKey(clientId,
subscriptionName);
if (isConcurrentStoreAndDispatchTopics()) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java
Thu Oct 21 10:58:51 2010
@@ -277,7 +277,8 @@ public class TempKahaDBStore extends Tem
super(destination);
}
- public void acknowledge(ConnectionContext context, String clientId,
String subscriptionName, MessageId messageId) throws IOException {
+ public void acknowledge(ConnectionContext context, String clientId,
String subscriptionName,
+ MessageId messageId, MessageAck ack) throws
IOException {
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
command.setDestination(dest);
command.setSubscriptionKey(subscriptionKey(clientId,
subscriptionName));
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
Thu Oct 21 10:58:51 2010
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
@@ -66,7 +67,8 @@ public class MemoryTopicMessageStore ext
}
}
- public synchronized void acknowledge(ConnectionContext context, String
clientId, String subscriptionName, MessageId messageId) throws IOException {
+ public synchronized void acknowledge(ConnectionContext context, String
clientId, String subscriptionName,
+ MessageId messageId, MessageAck ack)
throws IOException {
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
MemoryTopicSub sub = topicSubMap.get(key);
if (sub != null) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/TestSupport.java
Thu Oct 21 10:58:51 2010
@@ -74,7 +74,7 @@ public abstract class TestSupport extend
* Returns the name of the destination used in this test case
*/
protected String getDestinationString() {
- return getClass().getName() + "." + getName();
+ return getClass().getName() + "." + getName(true);
}
/**
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Thu Oct 21 10:58:51 2010
@@ -47,14 +47,15 @@ abstract public class MessagePriorityTes
ActiveMQConnectionFactory factory;
Connection conn;
- Session sess;
+ protected Session sess;
public boolean useCache;
+ public boolean dispatchAsync = false;
public int prefetchVal = 500;
- int MSG_NUM = 1000;
- int HIGH_PRI = 7;
- int LOW_PRI = 3;
+ public int MSG_NUM = 600;
+ public int HIGH_PRI = 7;
+ public int LOW_PRI = 3;
abstract protected PersistenceAdapter createPersistenceAdapter(boolean
delete) throws Exception;
@@ -78,6 +79,7 @@ abstract public class MessagePriorityTes
prefetch.setAll(prefetchVal);
factory.setPrefetchPolicy(prefetch);
factory.setWatchTopicAdvisories(false);
+ factory.setDispatchAsync(dispatchAsync);
conn = factory.createConnection();
conn.setClientID("priority");
conn.start();
@@ -110,7 +112,7 @@ abstract public class MessagePriorityTes
}
- class ProducerThread extends Thread {
+ protected class ProducerThread extends Thread {
int priority;
int messageCount;
@@ -154,7 +156,8 @@ abstract public class MessagePriorityTes
MessageConsumer queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < MSG_NUM * 2; i++) {
- Message msg = queueConsumer.receive(1000);
+ Message msg = queueConsumer.receive(5000);
+ LOG.debug("received i=" + i + ", " + (msg!=null?
msg.getJMSMessageID() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ?
HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
@@ -196,6 +199,7 @@ abstract public class MessagePriorityTes
public void initCombosForTestDurableSubsReconnect() {
addCombinationValues("prefetchVal", new Object[] {new Integer(1000),
new Integer(MSG_NUM/2)});
+ addCombinationValues("dispatchAsync", new Object[] {Boolean.TRUE,
Boolean.FALSE});
}
public void testDurableSubsReconnect() throws Exception {
@@ -217,7 +221,8 @@ abstract public class MessagePriorityTes
final int closeFrequency = MSG_NUM/4;
sub = sess.createDurableSubscriber(topic, subName);
for (int i = 0; i < MSG_NUM * 2; i++) {
- Message msg = sub.receive(5000);
+ Message msg = sub.receive(30000);
+ LOG.debug("received i=" + i + ", " + (msg!=null?
msg.getJMSMessageID() : null));
assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ?
HIGH_PRI : LOW_PRI, msg.getJMSPriority());
if (i>0 && i%closeFrequency==0) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCMessagePriorityTest.java
Thu Oct 21 10:58:51 2010
@@ -17,14 +17,20 @@
package org.apache.activemq.store.jdbc;
+import javax.jms.Message;
+import javax.jms.TopicSubscriber;
import junit.framework.Test;
-
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessagePriorityTest;
import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCMessagePriorityTest extends MessagePriorityTest {
+ private static final Log LOG =
LogFactory.getLog(JDBCMessagePriorityTest.class);
+
@Override
protected PersistenceAdapter createPersistenceAdapter(boolean delete)
throws Exception {
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
@@ -33,16 +39,56 @@ public class JDBCMessagePriorityTest ext
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
jdbc.deleteAllMessages();
+ jdbc.setCleanupPeriod(1000);
return jdbc;
}
-
+
+ // this cannot be a general test as kahaDB just has support for 3 priority
levels
+ public void testDurableSubsReconnectWithFourLevels() throws Exception {
+ ActiveMQTopic topic = (ActiveMQTopic) sess.createTopic("TEST");
+ final String subName = "priorityDisconnect";
+ TopicSubscriber sub = sess.createDurableSubscriber(topic, subName);
+ sub.close();
+
+ final int MED_PRI = LOW_PRI + 1;
+ final int MED_HIGH_PRI = HIGH_PRI - 1;
+
+ ProducerThread lowPri = new ProducerThread(topic, MSG_NUM, LOW_PRI);
+ ProducerThread medPri = new ProducerThread(topic, MSG_NUM, MED_PRI);
+ ProducerThread medHighPri = new ProducerThread(topic, MSG_NUM,
MED_HIGH_PRI);
+ ProducerThread highPri = new ProducerThread(topic, MSG_NUM, HIGH_PRI);
+
+ lowPri.start();
+ highPri.start();
+ medPri.start();
+ medHighPri.start();
+
+ lowPri.join();
+ highPri.join();
+ medPri.join();
+ medHighPri.join();
+
+
+ final int closeFrequency = MSG_NUM;
+ final int[] priorities = new int[]{HIGH_PRI, MED_HIGH_PRI, MED_PRI,
LOW_PRI};
+ sub = sess.createDurableSubscriber(topic, subName);
+ for (int i = 0; i < MSG_NUM * 4; i++) {
+ Message msg = sub.receive(30000);
+ LOG.debug("received i=" + i + ", m=" + (msg!=null?
msg.getJMSMessageID() : null));
+ assertNotNull("Message " + i + " was null", msg);
+ assertEquals("Message " + i + " has wrong priority", priorities[i
/ MSG_NUM], msg.getJMSPriority());
+ if (i > 0 && i % closeFrequency == 0) {
+ LOG.info("Closing durable sub.. on: " + i);
+ sub.close();
+ sub = sess.createDurableSubscriber(topic, subName);
+ }
+ }
+ LOG.info("closing on done!");
+ sub.close();
+ }
+
public static Test suite() {
return suite(JDBCMessagePriorityTest.class);
}
- // pending fix...
- @Override
- public void testDurableSubsReconnect() throws Exception {
- // TODO: fix jdbc durable sub recovery
- }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOfflineTest.java
Thu Oct 21 10:58:51 2010
@@ -16,9 +16,12 @@
*/
package org.apache.activemq.usecases;
+import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -27,11 +30,12 @@ import java.io.File;
public class DurableSubscriptionOfflineTest extends
org.apache.activemq.TestSupport {
+ public Boolean usePrioritySupport = Boolean.TRUE;
private BrokerService broker;
private ActiveMQTopic topic;
protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
- return new ActiveMQConnectionFactory("vm://" + getName());
+ return new ActiveMQConnectionFactory("vm://" + getName(true));
}
@Override
@@ -42,6 +46,10 @@ public class DurableSubscriptionOfflineT
return con;
}
+ public static Test suite() {
+ return suite(DurableSubscriptionOfflineTest.class);
+ }
+
protected void setUp() throws Exception {
topic = (ActiveMQTopic) createDestination();
createBroker();
@@ -54,15 +62,19 @@ public class DurableSubscriptionOfflineT
}
private void createBroker() throws Exception {
- broker = BrokerFactory.createBroker("broker:(vm://localhost)");
- broker.setBrokerName(getName());
+ broker = BrokerFactory.createBroker("broker:(vm://" + getName(true)
+")");
+ broker.setBrokerName(getName(true));
broker.setDeleteAllMessagesOnStartup(true);
- broker.setPersistent(true);
- KahaDBPersistenceAdapter persistenceAdapter = new
KahaDBPersistenceAdapter();
- persistenceAdapter.setDirectory(new File("activemq-data-kaha/" +
getName()));
- broker.setPersistenceAdapter(persistenceAdapter);
-
+ if (usePrioritySupport) {
+ PolicyEntry policy = new PolicyEntry();
+ policy.setPrioritizedMessages(true);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policy);
+ broker.setDestinationPolicy(policyMap);
+ }
+
+ setDefaultPersistenceAdapter(broker);
broker.start();
}
@@ -71,6 +83,13 @@ public class DurableSubscriptionOfflineT
broker.stop();
}
+ public void initCombosForTestOfflineSubscription() throws Exception {
+ this.addCombinationValues("defaultPersistenceAdapter",
+ new Object[]{ PersistenceAdapterChoice.KahaDB,
PersistenceAdapterChoice.JDBC});
+ this.addCombinationValues("usePrioritySupport",
+ new Object[]{ Boolean.TRUE, Boolean.FALSE});
+ }
+
public void testOfflineSubscription() throws Exception {
// create durable subscription
Connection con = createConnection();
Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=1025939&r1=1025938&r2=1025939&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Thu Oct 21
10:58:51 2010
@@ -21,7 +21,7 @@
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
-#log4j.logger.org.apache.activemq=DEBUG
+#log4j.logger.org.apache.activemq=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=DEBUG
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG