Author: tabish
Date: Thu Feb 16 00:02:15 2012
New Revision: 1244796
URL: http://svn.apache.org/viewvc?rev=1244796&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3714 lazy init the Scheduler
object in ActiveMQConnection.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1244796&r1=1244795&r2=1244796&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Feb 16 00:02:15 2012
@@ -33,6 +33,7 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -51,6 +52,7 @@ import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.XAConnection;
+
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.command.ActiveMQDestination;
@@ -190,7 +192,7 @@ public class ActiveMQConnection implemen
private boolean useDedicatedTaskRunner;
protected volatile CountDownLatch transportInterruptionProcessingComplete;
private long consumerFailoverRedeliveryWaitPeriod;
- private final Scheduler scheduler;
+ private Scheduler scheduler;
private boolean messagePrioritySupported = true;
private boolean transactedIndividualAck = false;
private boolean nonBlockingRedelivery = false;
@@ -230,8 +232,6 @@ public class ActiveMQConnection implemen
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
- this.scheduler = new Scheduler("ActiveMQConnection["+uniqueId+"]
Scheduler");
- this.scheduler.start();
}
protected void setUserName(String userName) {
@@ -622,9 +622,11 @@ public class ActiveMQConnection implemen
advisoryConsumer.dispose();
advisoryConsumer = null;
}
- if (this.scheduler != null) {
+
+ Scheduler scheduler = this.scheduler;
+ if (scheduler != null) {
try {
- this.scheduler.stop();
+ scheduler.stop();
} catch (Exception e) {
JMSException ex = JMSExceptionSupport.create(e);
throw ex;
@@ -2408,8 +2410,23 @@ public class ActiveMQConnection implemen
return consumerFailoverRedeliveryWaitPeriod;
}
- protected Scheduler getScheduler() {
- return this.scheduler;
+ protected Scheduler getScheduler() throws JMSException {
+ Scheduler result = scheduler;
+ if (result == null) {
+ synchronized (this) {
+ result = scheduler;
+ if (result == null) {
+ checkClosed();
+ try {
+ result = scheduler = new
Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"]
Scheduler");
+ scheduler.start();
+ } catch(Exception e) {
+ throw JMSExceptionSupport.create(e);
+ }
+ }
+ }
+ }
+ return result;
}
protected ThreadPoolExecutor getExecutor() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1244796&r1=1244795&r2=1244796&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Feb 16 00:02:15 2012
@@ -29,6 +29,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@@ -36,6 +37,7 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TransactionRolledBackException;
+
import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQDestination;
@@ -54,7 +56,6 @@ import org.apache.activemq.management.JM
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IntrospectionSupport;
@@ -109,7 +110,6 @@ public class ActiveMQMessageConsumer imp
}
private static final Logger LOG =
LoggerFactory.getLogger(ActiveMQMessageConsumer.class);
- protected final Scheduler scheduler;
protected final ActiveMQSession session;
protected final ConsumerInfo info;
@@ -207,7 +207,6 @@ public class ActiveMQMessageConsumer imp
}
this.session = session;
- this.scheduler = session.getScheduler();
this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
setTransformer(session.getTransformer());
@@ -1192,7 +1191,7 @@ public class ActiveMQMessageConsumer imp
new
LinkedList<MessageDispatch>(deliveredMessages);
// Start up the delivery again a little later.
- scheduler.executeAfterDelay(new Runnable() {
+ session.getScheduler().executeAfterDelay(new
Runnable() {
public void run() {
try {
if (!unconsumedMessages.isClosed()) {
@@ -1216,7 +1215,7 @@ public class ActiveMQMessageConsumer imp
if (redeliveryDelay > 0 &&
!unconsumedMessages.isClosed()) {
// Start up the delivery again a little later.
- scheduler.executeAfterDelay(new Runnable() {
+ session.getScheduler().executeAfterDelay(new
Runnable() {
public void run() {
try {
if (started.get()) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=1244796&r1=1244795&r2=1244796&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Thu Feb 16 00:02:15 2012
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
+
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@@ -53,6 +54,7 @@ import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.jms.TransactionRolledBackException;
+
import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.blob.BlobUploader;
@@ -198,7 +200,6 @@ public class ActiveMQSession implements
}
private static final Logger LOG =
LoggerFactory.getLogger(ActiveMQSession.class);
- private final Scheduler scheduler;
private final ThreadPoolExecutor connectionExecutor;
protected int acknowledgementMode;
@@ -251,7 +252,6 @@ public class ActiveMQSession implements
this.connection.asyncSendPacket(info);
setTransformer(connection.getTransformer());
setBlobTransferPolicy(connection.getBlobTransferPolicy());
- this.scheduler=connection.getScheduler();
this.connectionExecutor=connection.getExecutor();
this.executor = new ActiveMQSessionExecutor(this);
connection.addSession(this);
@@ -659,10 +659,14 @@ public class ActiveMQSession implements
//
for (final ActiveMQMessageConsumer consumer : consumers) {
consumer.inProgressClearRequired();
- scheduler.executeAfterDelay(new Runnable() {
- public void run() {
- consumer.clearMessagesInProgress();
- }}, 0l);
+ try {
+ connection.getScheduler().executeAfterDelay(new Runnable() {
+ public void run() {
+ consumer.clearMessagesInProgress();
+ }}, 0l);
+ } catch (JMSException e) {
+ connection.onClientInternalException(e);
+ }
}
}
@@ -892,7 +896,7 @@ public class ActiveMQSession implements
for (int i = 0; i < redeliveryCounter; i++) {
redeliveryDelay =
redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}
- scheduler.executeAfterDelay(new Runnable() {
+
connection.getScheduler().executeAfterDelay(new Runnable() {
public void run() {
((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
@@ -2051,8 +2055,8 @@ public class ActiveMQSession implements
}
}
- protected Scheduler getScheduler() {
- return this.scheduler;
+ protected Scheduler getScheduler() throws JMSException {
+ return this.connection.getScheduler();
}
protected ThreadPoolExecutor getConnectionExecutor() {