Author: chirino
Date: Wed Jan 4 13:09:16 2006
New Revision: 365993
URL: http://svn.apache.org/viewcvs?rev=365993&view=rev
Log:
Added a SubscriptionInfo[] getAllSubscriptions() to the TopicMessageStore. We
will need this if we want to eagerly load the durable subs when the broker
starts up.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java
Wed Jan 4 13:09:16 2006
@@ -90,4 +90,8 @@
public String getMessageReference(MessageId identity) throws IOException {
return delegate.getMessageReference(identity);
}
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ return delegate.getAllSubscriptions();
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/TopicMessageStore.java
Wed Jan 4 13:09:16 2006
@@ -70,6 +70,15 @@
public SubscriptionInfo lookupSubscription(String clientId, String
subscriptionName) throws IOException;
/**
+ * Lists all the durable subscirptions for a given destination.
+ *
+ * @param clientId TODO
+ * @param subscriptionName TODO
+ * @return
+ */
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException;
+
+ /**
* Inserts the subscriber info due to a subscription change
* <p/>
* If this is a new subscription and the retroactive is false, then the
last
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
Wed Jan 4 13:09:16 2006
@@ -74,5 +74,7 @@
public abstract void setUseExternalMessageReferences(boolean
useExternalMessageReferences);
+ public abstract SubscriptionInfo[]
doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination)
throws SQLException, IOException;
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
Wed Jan 4 13:09:16 2006
@@ -126,4 +126,15 @@
}
}
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ TransactionContext c = persistenceAdapter.getTransactionContext();
+ try {
+ return adapter.doGetAllSubscriptions(c, destination);
+ } catch (SQLException e) {
+ throw IOExceptionSupport.create("Failed to lookup subscriptions.
Reason: " + e, e);
+ } finally {
+ c.close();
+ }
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/StatementProvider.java
Wed Jan 4 13:09:16 2006
@@ -47,5 +47,6 @@
public boolean isUseExternalMessageReferences();
public String getFullMessageTableName();
+ public String getFindAllDurableSubsStatment();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/CachingStatementProvider.java
Wed Jan 4 13:09:16 2006
@@ -47,6 +47,7 @@
private String deleteOldMessagesStatment;
private String findLastSequenceIdInAcks;
private String findAllDestinationsStatment;
+ private String findAllDurableSubsStatment;
public CachingStatementProvider(StatementProvider statementProvider) {
this.statementProvider = statementProvider;
@@ -221,5 +222,12 @@
public String getFullMessageTableName() {
return statementProvider.getFullMessageTableName();
+ }
+
+ public String getFindAllDurableSubsStatment() {
+ if ( findAllDurableSubsStatment==null ) {
+ findAllDurableSubsStatment =
statementProvider.getFindAllDurableSubsStatment();
+ }
+ return findAllDurableSubsStatment;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Wed Jan 4 13:09:16 2006
@@ -21,6 +21,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
@@ -475,6 +476,32 @@
subscription.setSelector(rs.getString(1));
return subscription;
+ }
+ finally {
+ close(rs);
+ close(s);
+ }
+ }
+
+ public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,
ActiveMQDestination destination) throws SQLException, IOException {
+ PreparedStatement s = null;
+ ResultSet rs = null;
+ try {
+
+ s =
c.getConnection().prepareStatement(statementProvider.getFindAllDurableSubsStatment());
+ s.setString(1, destination.getQualifiedName());
+ rs = s.executeQuery();
+
+ ArrayList rc = new ArrayList();
+ while(rs.next()) {
+ SubscriptionInfo subscription = new SubscriptionInfo();
+ subscription.setDestination(destination);
+ subscription.setSelector(rs.getString(1));
+ subscription.setSubcriptionName(rs.getString(2));
+ subscription.setClientId(rs.getString(3));
+ }
+
+ return (SubscriptionInfo[]) rc.toArray(new
SubscriptionInfo[rc.size()]);
}
finally {
close(rs);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultStatementProvider.java
Wed Jan 4 13:09:16 2006
@@ -108,6 +108,12 @@
"FROM "+getTablePrefix()+durableSubAcksTableName+
" WHERE CONTAINER=? AND CLIENT_ID=? AND SUB_NAME=?";
}
+
+ public String getFindAllDurableSubsStatment() {
+ return "SELECT SELECTOR, SUB_NAME, CLIENT_ID" +
+ "FROM "+getTablePrefix()+durableSubAcksTableName+
+ " WHERE CONTAINER=?";
+ }
public String getUpdateLastAckOfDurableSub() {
return "UPDATE "+getTablePrefix()+durableSubAcksTableName+
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalTopicMessageStore.java
Wed Jan 4 13:09:16 2006
@@ -179,5 +179,9 @@
public void deleteSubscription(String clientId, String subscriptionName)
throws IOException {
longTermStore.deleteSubscription(clientId, subscriptionName);
}
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ return longTermStore.getAllSubscriptions();
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalTopicMessageStore.java
Wed Jan 4 13:09:16 2006
@@ -190,4 +190,8 @@
longTermStore.deleteSubscription(clientId, subscriptionName);
}
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ return longTermStore.getAllSubscriptions();
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java?rev=365993&r1=365992&r2=365993&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
Wed Jan 4 13:09:16 2006
@@ -112,4 +112,8 @@
subscriberDatabase.clear();
lastMessageId=null;
}
+
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new
SubscriptionInfo[subscriberDatabase.size()]);
+ }
}