Author: dejanb
Date: Wed Jul 21 16:06:39 2010
New Revision: 966291
URL: http://svn.apache.org/viewvc?rev=966291&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2843 - first stab at adding
priority for queues in JDBC store
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Wed Jul 21 16:06:39 2010
@@ -30,12 +30,14 @@ import org.apache.activemq.command.Subsc
public interface JDBCAdapter {
void setStatements(Statements statementProvider);
+
+ void setPrioritizedMessages(boolean prioritizedMessages);
void doCreateTables(TransactionContext c) throws SQLException, IOException;
void doDropTables(TransactionContext c) throws SQLException, IOException;
- void doAddMessage(TransactionContext c, long sequence, MessageId
messageID, ActiveMQDestination destination, byte[] data, long expiration)
throws SQLException, IOException;
+ void doAddMessage(TransactionContext c, long sequence, MessageId
messageID, ActiveMQDestination destination, byte[] data, long expiration, byte
priority) throws SQLException, IOException;
void doAddMessageReference(TransactionContext c, long sequence, MessageId
messageId, ActiveMQDestination destination, long expirationTime, String
messageRef) throws SQLException, IOException;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
Wed Jul 21 16:06:39 2010
@@ -82,7 +82,7 @@ public class JDBCMessageStore extends Ab
// Get a connection and insert the message into the DB.
TransactionContext c =
persistenceAdapter.getTransactionContext(context);
try {
- adapter.doAddMessage(c,sequenceId, messageId, destination, data,
message.getExpiration());
+ adapter.doAddMessage(c,sequenceId, messageId, destination, data,
message.getExpiration(), message.getPriority());
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to broker message: " +
messageId + " in container: " + e, e);
@@ -224,7 +224,6 @@ public class JDBCMessageStore extends Ab
*/
public void recoverNextMessages(int maxReturned, final
MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
-
try {
adapter.doRecoverNextMessages(c, destination,
lastStoreSequenceId.get(), maxReturned, new JDBCMessageRecoveryListener() {
@@ -294,4 +293,9 @@ public class JDBCMessageStore extends Ab
}
return result;
}
+
+ public void setPrioritizedMessages(boolean prioritizedMessages) {
+ super.setPrioritizedMessages(prioritizedMessages);
+ adapter.setPrioritizedMessages(prioritizedMessages);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java
Wed Jul 21 16:06:39 2010
@@ -65,6 +65,7 @@ public class Statements {
private String lastAckedDurableSubscriberMessageStatement;
private String destinationMessageCountStatement;
private String findNextMessagesStatement;
+ private String findNextMessagesByPriorityStatement;
private boolean useLockCreateWhereClause;
private String findAllMessageIdsStatement;
private String lastProducerSequenceIdStatement;
@@ -74,12 +75,13 @@ public class Statements {
createSchemaStatements = new String[] {
"CREATE TABLE " + getFullMessageTableName() + "(" + "ID " +
sequenceDataType + " NOT NULL"
+ ", CONTAINER " + containerNameDataType + ", MSGID_PROD "
+ msgIdDataType + ", MSGID_SEQ "
- + sequenceDataType + ", EXPIRATION " + longDataType + ",
MSG "
+ + sequenceDataType + ", EXPIRATION " + longDataType + ",
PRIORITY " + sequenceDataType + ", MSG "
+ (useExternalMessageReferences ? stringIdDataType :
binaryDataType)
+ ", PRIMARY KEY ( ID ) )",
"CREATE INDEX " + getFullMessageTableName() + "_MIDX ON " +
getFullMessageTableName() + " (MSGID_PROD,MSGID_SEQ)",
"CREATE INDEX " + getFullMessageTableName() + "_CIDX ON " +
getFullMessageTableName() + " (CONTAINER)",
"CREATE INDEX " + getFullMessageTableName() + "_EIDX ON " +
getFullMessageTableName() + " (EXPIRATION)",
+ "CREATE INDEX " + getFullMessageTableName() + "_PIDX ON " +
getFullMessageTableName() + " (PRIORITY)",
"CREATE TABLE " + getFullAckTableName() + "(" + "CONTAINER " +
containerNameDataType + " NOT NULL"
+ ", SUB_DEST " + stringIdDataType
+ ", CLIENT_ID " + stringIdDataType + " NOT NULL" + ",
SUB_NAME " + stringIdDataType
@@ -107,7 +109,7 @@ public class Statements {
if (addMessageStatement == null) {
addMessageStatement = "INSERT INTO "
+ getFullMessageTableName()
- + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER,
EXPIRATION, MSG) VALUES (?, ?, ?, ?, ?, ?)";
+ + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER,
EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, ?)";
}
return addMessageStatement;
}
@@ -369,6 +371,17 @@ public class Statements {
}
/**
+ * @return the findNextMessagesStatement
+ */
+ public String getFindNextMessagesByPriorityStatement() {
+ if (findNextMessagesByPriorityStatement == null) {
+ findNextMessagesByPriorityStatement = "SELECT ID, MSG FROM " +
getFullMessageTableName()
+ + " WHERE CONTAINER=? ORDER BY
PRIORITY DESC, ID";
+ }
+ return findNextMessagesByPriorityStatement;
+ }
+
+ /**
* @return the lastAckedDurableSubscriberMessageStatement
*/
public String getLastAckedDurableSubscriberMessageStatement() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Wed Jul 21 16:06:39 2010
@@ -56,6 +56,7 @@ public class DefaultJDBCAdapter implemen
private static final Log LOG = LogFactory.getLog(DefaultJDBCAdapter.class);
protected Statements statements;
protected boolean batchStatments = true;
+ protected boolean prioritizedMessages;
protected void setBinaryData(PreparedStatement s, int index, byte data[])
throws SQLException {
s.setBytes(index, data);
@@ -190,7 +191,7 @@ public class DefaultJDBCAdapter implemen
public void doAddMessage(TransactionContext c, long sequence, MessageId
messageID, ActiveMQDestination destination, byte[] data,
- long expiration) throws SQLException, IOException {
+ long expiration, byte priority) throws SQLException, IOException {
PreparedStatement s = c.getAddMessageStatement();
try {
if (s == null) {
@@ -204,7 +205,8 @@ public class DefaultJDBCAdapter implemen
s.setLong(3, messageID.getProducerSequenceId());
s.setString(4, destination.getQualifiedName());
s.setLong(5, expiration);
- setBinaryData(s, 6, data);
+ s.setLong(6, priority);
+ setBinaryData(s, 7, data);
if (this.batchStatments) {
s.addBatch();
} else if (s.executeUpdate() != 1) {
@@ -710,6 +712,14 @@ public class DefaultJDBCAdapter implemen
public void setStatements(Statements statements) {
this.statements = statements;
+ }
+
+ public boolean isPrioritizedMessages() {
+ return prioritizedMessages;
+ }
+
+ public void setPrioritizedMessages(boolean prioritizedMessages) {
+ this.prioritizedMessages = prioritizedMessages;
}
/**
@@ -765,10 +775,16 @@ public class DefaultJDBCAdapter implemen
PreparedStatement s = null;
ResultSet rs = null;
try {
- s =
c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
+ if (isPrioritizedMessages()) {
+ s =
c.getConnection().prepareStatement(this.statements.getFindNextMessagesByPriorityStatement());
+ } else {
+ s =
c.getConnection().prepareStatement(this.statements.getFindNextMessagesStatement());
+ }
s.setMaxRows(maxReturned * 2);
s.setString(1, destination.getQualifiedName());
- s.setLong(2, nextSeq);
+ if (!isPrioritizedMessages()) {
+ s.setLong(2, nextSeq);
+ }
rs = s.executeQuery();
int count = 0;
if (this.statements.isUseExternalMessageReferences()) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?rev=966291&r1=966290&r2=966291&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
Wed Jul 21 16:06:39 2010
@@ -148,7 +148,7 @@ abstract public class MessagePriorityTes
MessageConsumer queueConsumer = sess.createConsumer(queue);
for (int i = 0; i < MSG_NUM * 2; i++) {
Message msg = queueConsumer.receive(1000);
- assertNotNull(msg);
+ assertNotNull("Message " + i + " was null", msg);
assertEquals("Message " + i + " has wrong priority", i < MSG_NUM ?
HIGH_PRI : LOW_PRI, msg.getJMSPriority());
}
}