[ 
https://issues.apache.org/jira/browse/AMQ-5508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14309317#comment-14309317
 ] 

Pero Atanasov edited comment on AMQ-5508 at 2/16/15 7:09 PM:
-------------------------------------------------------------

I agree that the proposed patch is just band-aiding it. This issue, considering 
its origin, needs to be addressed with a larger-impact fix. To explain the root 
cause, following are some code excerpts:

BrokerService.isStarted() is used to determine whether to create/add a 
corresponding (to regular destination) advisory destination and send an 
advisory message for it accordingly.

activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java

Lines 605 - 637 [in fireAdvisory(...)]
if (getBrokerService().isStarted()) {
     // code to create and populate an advisory message
     next.send(producerExchange, advisoryMessage);
     // send will trigger destination creation/addition for this advisory 
message topic
}

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

Lines 501 - 503
public boolean isStarted() {
     return started.get() && startedLatch.getCount() == 0;
}

When a broker is restarted, BrokerService.isStarted() will be false when known 
destinations are being re-added, so no corresponding advisory topic 
destinations will be created/added for such topics. Consider the following 
sequence of steps when the broker is re-starting:

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

Lines 558 - 612 [in start()]
if (stopped.get() || !started.compareAndSet(false, true)) {
     // lets just ignore redundant start() calls
     // as its way too easy to not be completely sure if start() has been
     // called or not with the gazillion of different configuration
     // mechanisms
     // throw new IllegalStateException("Already started.");
     return;
}

// ... some code ...
startBroker(startAsync);
// ... some code ... 

Note that this will set to true one of the conditions (AtomicBoolean started) 
on which isStarted() depends on. However, the second condition (CountDownLatch 
startedLatch) on which isStarted() depends on will not be set to true *until* 
after all of the known topics/destinations are re-added. After the 
startBroker(startAsync) call, execution continues within

Lines 665 - 710 [in doStartBroker()]

// ... some code ...
startDestinations();
// ... some code ...
LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ 
getBrokerVersion(), getBrokerName(), brokerId });
broker.start();
// ... some code ...
startAllConnectors();
// ... some code ...
startedLatch.countDown();

The broker.start() call above will trigger the addition of all known 
topics/destinations and considering the fact that startedLatch.countDown() is 
called later in the doStartBroker() function above, isStarted() will still be 
false when re-adding known destinations. Therefore, associated advisory 
destinations will fail to be added and advisory messages will not be sent out. 
That's bad. Here is the known destination re-addition code:

activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java

Lines 89 - 110 [in start()]
// ... some code ...
context.getBroker().addDestination(context, dest, false);
// ... some code ...

In summary, the order of these calls makes it impossible for advisory 
destinations to be created/added for known destinations upon broker restart:

1.      Set “started” AtomicBoolean variable to true
2.      Re-add all known destinations (associated advisory destinations will 
fail to be created)
3.      Decrement “startedLatch” CountDownLatch to 0
4. isStarted() becomes true

Only after step 3 does isStarted() become true. At that point, we have missed 
to create the advisory destinations. Other events on those destinations 
(removal) may trigger the advisory destination addition again, but that can run 
into issues like the one stated in the initial report. 

To really address this, the order of these events needs to change. However, I 
need to learn more about all of the dependencies to make sure that other things 
will not be broken with a potential reordering. 

Any ideas?

NOTE: The order of the above-stated events was confirmed with a stack-trace:

2015-01-06 15:49:41,976 | INFO  |       at 
org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:139)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,976 | INFO  |       at 
org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:330)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,976 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:171) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:195)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:171) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:171) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:177)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:99) 
| org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,978 | INFO  |       at 
org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:190) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,978 | INFO  |       at 
org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:121)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,978 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:187) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,978 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:187) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:120) 
| org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.BrokerService$5.start(BrokerService.java:2180) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:676) 
| org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:660) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595) | 
org.apache.activemq.broker.region.AbstractRegion | main




was (Author: patanasov):
I agree that the proposed patch is just band-aiding it. This issue, considering 
its origin, needs to be addressed with a larger-impact fix. To explain the root 
cause, following are some code excerpts:

BrokerService.isStarted() is used to determine whether to create/add a 
corresponding (to regular destination) advisory destination and send an 
advisory message for it accordingly.

activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java

Lines 605 - 637 [in fireAdvisory(...)]
if (getBrokerService().isStarted()) {
     // code to create and populate an advisory message
     next.send(producerExchange, advisoryMessage);
     // send will trigger destination creation/addition for this advisory 
message topic
}

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

Lines 501 - 503
public boolean isStarted() {
     return started.get() && startedLatch.getCount() == 0;
}

When a broker is restarted, BrokerService.isStarted() will be false when known 
destinations are being re-added, so no corresponding advisory topic 
destinations will be created/added for such topics. Consider the following 
sequence of steps when the broker is re-starting:

activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

Lines 558 - 612 [in start()]
if (stopped.get() || !started.compareAndSet(false, true)) {
     // lets just ignore redundant start() calls
     // as its way too easy to not be completely sure if start() has been
     // called or not with the gazillion of different configuration
     // mechanisms
     // throw new IllegalStateException("Already started.");
     return;
}

// ... some code ...
startBroker(startAsync);
// ... some code ... 

Note that this will set to true one of the conditions (AtomicBoolean started) 
on which isStarted() depends on. However, the second condition (CountDownLatch 
startedLatch) on which isStarted() depends on will not be set to true *until* 
after all of the known topics/destinations are re-added. After the 
startBroker(startAsync) call, execution continues within

Lines 665 - 710 [in doStartBroker()]

// ... some code ...
startDestinations();
// ... some code ...
LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ 
getBrokerVersion(), getBrokerName(), brokerId });
broker.start();
// ... some code ...
startAllConnectors();
// ... some code ...
startedLatch.countDown();

The broker.start() call above will trigger the addition of all known 
topics/destinations and considering the fact that startedLatch.countDown() is 
called later in the doStartBroker() function above, isStarted() will still be 
false when re-adding known destinations. Therefore, associated advisory 
destinations will fail to be added and advisory messages will not be sent out. 
That's bad. Here is the known destination re-addition code:

activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java

Lines 89 - 110 [in start()]
// ... some code ...
context.getBroker().addDestination(context, dest, false);
// ... some code ...

In summary, the order of these calls makes it impossible for advisory 
destinations to be created/added for known destinations upon broker restart:

1.      Set “started” AtomicBoolean variable to true
2.      Re-add all known destinations (associated advisory destinations will 
fail to be created)
3.      Set “startedLatch” CountDownLatch to true
4. isStarted() becomes true

Only after step 3 does isStarted() become true. At that point, we have missed 
to create the advisory destinations. Other events on those destinations 
(removal) may trigger the advisory destination addition again, but that can run 
into issues like the one stated in the initial report. 

To really address this, the order of these events needs to change. However, I 
need to learn more about all of the dependencies to make sure that other things 
will not be broken with a potential reordering. 

Any ideas?

NOTE: The order of the above-stated events was confirmed with a stack-trace:

2015-01-06 15:49:41,976 | INFO  |       at 
org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:139)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,976 | INFO  |       at 
org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:330)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,976 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:171) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:195)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:171) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:171) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:177)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,977 | INFO  |       at 
org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:99) 
| org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,978 | INFO  |       at 
org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:190) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,978 | INFO  |       at 
org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:121)
 | org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,978 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:187) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,978 | INFO  |       at 
org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:187) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:120) 
| org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.BrokerService$5.start(BrokerService.java:2180) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:676) 
| org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:660) | 
org.apache.activemq.broker.region.AbstractRegion | main
2015-01-06 15:49:41,979 | INFO  |       at 
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595) | 
org.apache.activemq.broker.region.AbstractRegion | main



> Broker shutdown race condition leads to "IllegalStateException: Timer already 
> cancelled"
> ----------------------------------------------------------------------------------------
>
>                 Key: AMQ-5508
>                 URL: https://issues.apache.org/jira/browse/AMQ-5508
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.10.0
>            Reporter: Pero Atanasov
>         Attachments: AMQ5508.patch
>
>
> This issue is seen on the surface with the broker failing to remove a 
> consumer:
> 2014-12-19 10:45:42,586 | WARN  | Failed to remove consumer:
> ID:hack-34927-1419007505006-2:3:0:0 |
> org.apache.activemq.broker.TransportConnection | ActiveMQ
> BrokerService[broker0] Task-6
> java.lang.IllegalStateException: Timer already cancelled.
> The full stack trace [STACK_TRACE] will be specified at the end of this 
> report.
> In this case, the race condition is exposed via some of the AdvisoryBroker 
> code while trying to send an advisory message as part of the 
> consumer/producer addition/removal code. Below are excerpts of code to 
> explain this race condition:
> activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
> Lines 605 - 637 [in fireAdvisory(...)]
> if (getBrokerService().isStarted()) {
>      // code to create and populate an advisory message
>      next.send(producerExchange, advisoryMessage);
>      // send will trigger destination creation/addition for this advisory 
> message topic
> }
> However, it can easily happen that a client (consumer or producer) connects 
> and is added _before_ getBrokerService().isStarted() is true, so in that 
> case, the boolean expression in the block above will evaluate to false and 
> hence skip the entire advisory message send and create/add destination 
> process. At this point, the consumer/producer destination does not have its 
> advisory message destination created. If no other consumer/producer connects 
> on the same destination until the broker is shutting down, then no advisory 
> destination pairing will be created for that consumer/producer. When the 
> broker starts the shutdown process, the event that will trigger an advisory 
> message will be the removal of a consumer/producer as part of the 
> TransportConnection stopping process. The sequence of calls with line numbers 
> can be seen within the STACK_TRACE.
> Back to the above specified code block, when the broker is shutting down, the 
> broker is still in the started state. Then, the boolean expression will 
> evaluate to true and hence proceed with the advisory message creation/sending 
> and destination creation/addition process. However, in the meantime, as part 
> of the shutdown process, the Scheduler timer is being cancelled. Consider the 
> following blocks of code:
> activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> Lines 756 - 761 [in stop()]
> // for each service
>      stopper.stop(service);
> activemq-client/src/main/java/org/apache/activemq/util/ServiceSupport.java
> Lines 67 - 84 [in stop()]
> if (stopped.compareAndSet(false, true)) {
>     ... some code ...
>     doStop(stopper);
>     ... some code ...    
> }
> activemq-client/src/main/java/org/apache/activemq/thread/Scheduler.java
> Lines 69 - 71 [in doStop(...)]
> if (this.timer != null) {
>      this.timer.cancel();
> }
> At this point, the timer is cancelled. Now, back to the thread removing the 
> consumer. From the STACK_TRACE, it can be seen that after many calls related 
> to the advisory message destination creation, it ends up in AbstractRegion’s 
> addDestination method.
> activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
> Lines 132 - 145 [in addDestination(...)]
> if (dest == null) {
>      ... some code ...
>      dest = createDestination(context, destination);
>      ... some code ...
>      dest.start();  
>      ... some code ...
> }
> As it can be seen in the STACK_TRACE below, dest.start() call will trigger 
> the need for a timer to be scheduled on the same timer that has already been 
> cancelled. Some debug prints to confirm the race condition:
> 1. Fail to send advisory message for this consumer advisory topic because 
> getBrokerService().isStarted() is false:
> 2015-01-06 08:49:26,215 | INFO  | patanasov: from AdvisoryBroker fireAdvisory 
> would have sent advisoryMessage: topic 
> ActiveMQ.Advisory.Consumer.Queue.2015.01.06-08.23.00.16401 | 
> org.apache.activemq.advisory.AdvisoryBroker
> 2. Broker shutting down and cancelling timer:
> 2015-01-06 08:49:43,602 | INFO  | Apache ActiveMQ 5.10.0 (broker0, 
> ID:host-40197-1420555763149-0:1) is shutting down | 
> org.apache.activemq.broker.BrokerService | ActiveMQ ShutdownHook
> 2015-01-06 08:49:43,603 | INFO  | patanasov: from ServiceSupport calling 
> doStop(stopper) | org.apache.activemq.util.ServiceSupport | ActiveMQ 
> ShutdownHook
> 2015-01-06 08:49:43,603 | INFO  | patanasov: from Scheduler 
> doStop(ServiceStopper stopper) calling this.timer.cancel(): timer 
> java.util.Timer@7df33bb0 | org.apache.activemq.thread.Scheduler | ActiveMQ 
> ShutdownHook
> 3. Adding destination and scheduling on cancelled timer (7df33bb0):
> 2015-01-06 08:49:43,767 | INFO  | patanasov: broker0 adding destination: 
> qualified: 
> topic://ActiveMQ.Advisory.Consumer.Queue.2015.01.06-08.23.00.16401, physical: 
> ActiveMQ.Advisory.Consumer.Queue.2015.01.06-08.23.00.16401 | 
> org.apache.activemq.broker.region.AbstractRegion | ActiveMQ 
> BrokerService[broker0] Task-12
> 2015-01-06 08:49:43,782 | INFO  | patanasov: from schedualPeriodically: 
> timer: java.util.Timer@7df33bb0 | org.apache.activemq.thread.Scheduler | 
> ActiveMQ BrokerService[broker0] Task-12
> 2015-01-06 08:49:43,785 | INFO  | patanasov: caught an exception in 
> schedualPeriodically: | org.apache.activemq.thread.Scheduler | ActiveMQ 
> BrokerService[broker0] Task-12
> java.lang.IllegalStateException: Timer already cancelled.
> I will provide a patch to fix this issue. However, the bigger concern here is 
> that advisory message destination creations fail for clients that connect 
> while getBrokerService().isStarted() is false. If there is only one 
> consumer/producer on one destination and if it connects while 
> getBrokerService().isStarted() is false, then it will fail to create/send 
> advisory message for that destination, at least until the producer/consumer 
> is removed at the end. I will continue looking to learn more about this state 
> of the broker and open a separate JIRA issue if needed.
> STACK_TRACE:
> java.lang.IllegalStateException: Timer already cancelled.
>     at java.util.Timer.sched(Timer.java:354)
>     at java.util.Timer.schedule(Timer.java:222)
>     at
> org.apache.activemq.thread.Scheduler.schedualPeriodically(Scheduler.java:53)
>     at org.apache.activemq.broker.region.Topic.start(Topic.java:563)
>     at
> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:141)
>     at
> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:325)
>     at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:167)
>     at
> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:184)
>     at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:167)
>     at
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:167)
>     at
> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:172)
>     at
> org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:439)
>     at
> org.apache.activemq.broker.jmx.ManagedRegionBroker.send(ManagedRegionBroker.java:297)
>     at
> org.apache.activemq.advisory.AdvisoryBroker.fireAdvisory(AdvisoryBroker.java:615)
>     at
> org.apache.activemq.advisory.AdvisoryBroker.fireConsumerAdvisory(AdvisoryBroker.java:564)
>     at
> org.apache.activemq.advisory.AdvisoryBroker.fireConsumerAdvisory(AdvisoryBroker.java:550)
>     at
> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:271)
>     at
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132)
>     at
> org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:132)
>     at
> org.apache.activemq.broker.MutableBrokerFilter.removeConsumer(MutableBrokerFilter.java:137)
>     at
> org.apache.activemq.broker.TransportConnection.processRemoveConsumer(TransportConnection.java:651)
>     at
> org.apache.activemq.broker.TransportConnection.processRemoveSession(TransportConnection.java:690)
>     at
> org.apache.activemq.broker.TransportConnection.processRemoveConnection(TransportConnection.java:802)
>     at
> org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:1143)
>     at
> org.apache.activemq.broker.TransportConnection$4.run(TransportConnection.java:1073)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
>     at java.lang.Thread.run(Thread.java:619) 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to