Author: rajdavies Date: Thu Nov 23 00:19:00 2006 New Revision: 478509 URL: http://svn.apache.org/viewvc?view=rev&rev=478509 Log: support for different message storage cursor types for Queue destination policies
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (with props) incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java (with props) Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?view=diff&rev=478509&r1=478508&r2=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Thu Nov 23 00:19:00 2006 @@ -127,7 +127,7 @@ if (broker.getDestinationPolicy() != null) { PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); if (entry != null) { - entry.configure(queue); + entry.configure(queue,broker.getTempDataStore()); } } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=478509&r1=478508&r2=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Nov 23 00:19:00 2006 @@ -366,6 +366,11 @@ if(!skipGc&&garbageSize>garbageSizeBeforeCollection){ gc(); } + try{ + taskRunner.wakeup(); + }catch(InterruptedException e){ + log.warn("Task Runner failed to wakeup ",e); + } } public void gc() { @@ -379,11 +384,6 @@ continue; } } - } - try{ - taskRunner.wakeup(); - }catch(InterruptedException e){ - log.warn("Task Runner failed to wakeup ",e); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java?view=diff&rev=478509&r1=478508&r2=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java Thu Nov 23 00:19:00 2006 @@ -129,10 +129,6 @@ // implementation protected void fillBatch() throws Exception{ store.recoverNextMessages(maxBatchSize,this); - // this will add more messages to the batch list - if(!batchList.isEmpty()){ - Message message=(Message)batchList.getLast(); - } } public String toString() { Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?view=diff&rev=478509&r1=478508&r2=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java Thu Nov 23 00:19:00 2006 @@ -57,6 +57,7 @@ nonPersistent.setMaxBatchSize(getMaxBatchSize()); } nonPersistent.start(); + persistent.start(); pendingCount=persistent.size(); } @@ -65,6 +66,7 @@ if(nonPersistent!=null){ nonPersistent.stop(); } + persistent.stop(); pendingCount=0; } Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java?view=auto&rev=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java Thu Nov 23 00:19:00 2006 @@ -0,0 +1,43 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.kaha.Store; + + +/** + * Creates a FilePendingMessageCursor + * * + * @org.apache.xbean.XBean element="fileCursor" description="Pending messages paged in from file" + * + * @version $Revision$ + */ +public class FilePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{ + + /** + * @param queue + * @param tmpStore + * @return the cursor + * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue, org.apache.activemq.kaha.Store) + */ + public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){ + return new FilePendingMessageCursor("PendingCursor:" + queue.getName(),tmpStore); + } + + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FilePendingQueueMessageStoragePolicy.java ------------------------------------------------------------------------------ svn:executable = * Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PendingQueueMessageStoragePolicy.java Thu Nov 23 00:19:00 2006 @@ -14,7 +14,10 @@ package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.kaha.Store; + /** * Abstraction to allow different policies for holding messages awaiting dispatch on a Queue @@ -25,8 +28,10 @@ /** * Retrieve the configured pending message storage cursor; + * @param queue + * @param tmpStore * @return the cursor * */ - public PendingMessageCursor getQueuePendingMessageCursor(); + public PendingMessageCursor getQueuePendingMessageCursor(Queue queue, Store tmpStore); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?view=diff&rev=478509&r1=478508&r2=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java Thu Nov 23 00:19:00 2006 @@ -24,6 +24,7 @@ import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.filter.DestinationMapEntry; +import org.apache.activemq.kaha.Store; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,7 +50,7 @@ private MessageGroupMapFactory messageGroupMapFactory; private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy; - public void configure(Queue queue) { + public void configure(Queue queue, Store tmpStore) { if (dispatchPolicy != null) { queue.setDispatchPolicy(dispatchPolicy); } @@ -61,7 +62,7 @@ queue.getUsageManager().setLimit(memoryLimit); } if (pendingQueueMessageStoragePolicy != null) { - PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(); + PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(queue,tmpStore); queue.setMessages(messages); } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingDurableSubscriberMessageStoragePolicy.java Thu Nov 23 00:19:00 2006 @@ -20,7 +20,7 @@ /** * Creates a PendingMessageCursor that access the persistent store to retrieve messages - * * + * * @org.apache.xbean.XBean element="storeDurableSubscriberCursor" description="Pending messages for a durable subscriber * are referenced from the Store" * Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java?view=auto&rev=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java (added) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java Thu Nov 23 00:19:00 2006 @@ -0,0 +1,43 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.activemq.broker.region.policy; + +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.StoreQueueCursor; +import org.apache.activemq.kaha.Store; + + +/** + * Creates a StoreQueueCursor + * * + * @org.apache.xbean.XBean element="storeCursor" description="Pending messages paged in from the Store" + * + * @version $Revision$ + */ +public class StorePendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{ + + /** + * @param queue + * @param tmpStore + * @return the cursor + * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getQueuePendingMessageCursor(org.apache.openjpa.lib.util.concurrent.Queue, org.apache.activemq.kaha.Store) + */ + public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){ + return new StoreQueueCursor(queue,tmpStore); + } + + +} Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/StorePendingQueueMessageStoragePolicy.java ------------------------------------------------------------------------------ svn:executable = * Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java?view=diff&rev=478509&r1=478508&r2=478509 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/VMPendingQueueMessageStoragePolicy.java Thu Nov 23 00:19:00 2006 @@ -1,41 +1,39 @@ /** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ + package org.apache.activemq.broker.region.policy; +import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; +import org.apache.activemq.kaha.Store; /** * Creates a VMPendingMessageCursor - * - ** @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM" + * * + * @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM" * * @version $Revision$ */ -public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy { - +public class VMPendingQueueMessageStoragePolicy implements PendingQueueMessageStoragePolicy{ /** - * @return the Pending Message cursor - * @see org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy#getgetQueuePendingMessageCursor() + * @param queue + * @param tmpStore + * @return the cursor */ - public PendingMessageCursor getQueuePendingMessageCursor(){ - return new VMPendingMessageCursor(); + public PendingMessageCursor getQueuePendingMessageCursor(Queue queue,Store tmpStore){ + return new VMPendingMessageCursor(); } - }