Author: chirino
Date: Mon Jun 12 08:08:33 2006
New Revision: 413674
URL: http://svn.apache.org/viewvc?rev=413674&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-714
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?rev=413674&r1=413673&r2=413674&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
Mon Jun 12 08:08:33 2006
@@ -34,6 +34,12 @@
volatile private MessageReference messages[];
private int maximumSize=100;
private int tail=0;
+
+ public SubscriptionRecoveryPolicy copy() {
+ FixedCountSubscriptionRecoveryPolicy rc = new
FixedCountSubscriptionRecoveryPolicy();
+ rc.setMaximumSize(maximumSize);
+ return rc;
+ }
synchronized public boolean add(ConnectionContext context,MessageReference
node) throws Exception{
messages[tail++]=node;
@@ -109,4 +115,5 @@
}
return (Message[]) result.toArray(new Message[result.size()]);
}
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java?rev=413674&r1=413673&r2=413674&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedSizedSubscriptionRecoveryPolicy.java
Mon Jun 12 08:08:33 2006
@@ -48,6 +48,13 @@
private int maximumSize = 100 * 64 * 1024;
private boolean useSharedBuffer = true;
+ public SubscriptionRecoveryPolicy copy() {
+ FixedSizedSubscriptionRecoveryPolicy rc = new
FixedSizedSubscriptionRecoveryPolicy();
+ rc.setMaximumSize(maximumSize);
+ rc.setUseSharedBuffer(useSharedBuffer);
+ return rc;
+ }
+
public boolean add(ConnectionContext context, MessageReference message)
throws Exception {
buffer.add(message);
return true;
@@ -125,4 +132,5 @@
return new DestinationBasedMessageList(maximumSize);
}
}
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java?rev=413674&r1=413673&r2=413674&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/LastImageSubscriptionRecoveryPolicy.java
Mon Jun 12 08:08:33 2006
@@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@@ -74,6 +75,10 @@
result.add(lastImage.getMessage());
}
return (Message[])result.toArray(new Message[result.size()]);
+ }
+
+ public SubscriptionRecoveryPolicy copy() {
+ return new LastImageSubscriptionRecoveryPolicy();
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java?rev=413674&r1=413673&r2=413674&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/NoSubscriptionRecoveryPolicy.java
Mon Jun 12 08:08:33 2006
@@ -32,6 +32,12 @@
*/
public class NoSubscriptionRecoveryPolicy implements
SubscriptionRecoveryPolicy {
+
+ public SubscriptionRecoveryPolicy copy() {
+ // This object is immutable
+ return this;
+ }
+
public boolean add(ConnectionContext context, MessageReference node)
throws Exception {
return true;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=413674&r1=413673&r2=413674&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Mon Jun 12 08:08:33 2006
@@ -65,7 +65,7 @@
topic.setDeadLetterStrategy(deadLetterStrategy);
}
if (subscriptionRecoveryPolicy != null) {
- topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy);
+
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
}
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
if( memoryLimit>0 ) {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java?rev=413674&r1=413673&r2=413674&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/QueryBasedSubscriptionRecoveryPolicy.java
Mon Jun 12 08:08:33 2006
@@ -55,7 +55,10 @@
private IdGenerator idGenerator = new IdGenerator();
private ProducerId producerId = createProducerId();
- public QueryBasedSubscriptionRecoveryPolicy() {
+ public SubscriptionRecoveryPolicy copy() {
+ QueryBasedSubscriptionRecoveryPolicy rc = new
QueryBasedSubscriptionRecoveryPolicy();
+ rc.setQuery(query);
+ return rc;
}
public boolean add(ConnectionContext context, MessageReference message)
throws Exception {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java?rev=413674&r1=413673&r2=413674&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/SubscriptionRecoveryPolicy.java
Mon Jun 12 08:08:33 2006
@@ -64,4 +64,8 @@
*/
Message[] browse(ActiveMQDestination dest) throws Exception;
+ /**
+ * Used to copy the policy object.
+ */
+ SubscriptionRecoveryPolicy copy();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java?rev=413674&r1=413673&r2=413674&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/TimedSubscriptionRecoveryPolicy.java
Mon Jun 12 08:08:33 2006
@@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@@ -66,6 +67,12 @@
gc();
}
};
+
+ public SubscriptionRecoveryPolicy copy() {
+ TimedSubscriptionRecoveryPolicy rc = new
TimedSubscriptionRecoveryPolicy();
+ rc.setRecoverDuration(recoverDuration);
+ return rc;
+ }
public boolean add(ConnectionContext context, MessageReference message)
throws Exception {
buffer.add(new TimestampWrapper(message, lastGCRun));