there may be too much removed here. The adminview on isSlave was driven by shared file system master slave, not puremaster slave. One related issue is https://issues.apache.org/jira/browse/AMQ-3696. not sure where the original requirement came from. But it makes sense to have some jmxpresence for a broker waiting to lock a store. Even if the broker is waiting to get a shared broker lock, it is nice to see that. w.r.t the webconsole, the lifecycle of that can be independent of the broker, so I think it is good that it can reflect a slave status or a "trying to obtain lock" status. Maybe the solution here is some jmx instrumentation on the abstract plugable locker.
On 26 November 2012 21:13, <[email protected]> wrote: > Author: chirino > Date: Mon Nov 26 21:13:25 2012 > New Revision: 1413846 > > URL: http://svn.apache.org/viewvc?rev=1413846&view=rev > Log: > Changes for https://issues.apache.org/jira/browse/AMQ-4165 : Remove pure > master/slave functionality > > Removed: > > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.java > Modified: > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java > > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java > > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java > > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java > > activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java > > activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java > > activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java > Mon Nov 26 21:13:25 2012 > @@ -168,7 +168,6 @@ public class BrokerService implements Se > private File schedulerDirectoryFile; > private Scheduler scheduler; > private ThreadPoolExecutor executor; > - private boolean slave = true; > private int schedulePeriodForDestinationPurge= 0; > private int maxPurgedDestinationsPerSweep = 0; > private BrokerContext brokerContext; > @@ -392,13 +391,6 @@ public class BrokerService implements Se > return null; > } > > - /** > - * @return true if this Broker is a slave to a Master > - */ > - public boolean isSlave() { > - return slave; > - } > - > public void masterFailed() { > if (shutdownOnMasterFailure) { > LOG.error("The Master has failed ... shutting down"); > @@ -578,7 +570,6 @@ public class BrokerService implements Se > if (startException != null) { > return; > } > - slave = false; > startDestinations(); > addShutdownHook(); > > @@ -604,9 +595,7 @@ public class BrokerService implements Se > adminView.setBroker(managedBroker); > } > > - if (!isSlave()) { > - startAllConnectors(); > - } > + startAllConnectors(); > > if (ioExceptionHandler == null) { > setIoExceptionHandler(new DefaultIOExceptionHandler()); > @@ -680,7 +669,6 @@ public class BrokerService implements Se > try { > stopper.stop(persistenceAdapter); > persistenceAdapter = null; > - slave = true; > if (isUseJmx()) { > stopper.stop(getManagementContext()); > managementContext = null; > @@ -1227,8 +1215,7 @@ public class BrokerService implements Se > } > > /** > - * Sets the services associated with this broker such as a > - * {@link MasterConnector} > + * Sets the services associated with this broker. > */ > public void setServices(Service[] services) { > this.services.clear(); > @@ -2246,82 +2233,80 @@ public class BrokerService implements Se > * @throws Exception > */ > public void startAllConnectors() throws Exception { > - if (!isSlave()) { > - Set<ActiveMQDestination> durableDestinations = > getBroker().getDurableDestinations(); > - List<TransportConnector> al = new > ArrayList<TransportConnector>(); > - for (Iterator<TransportConnector> iter = > getTransportConnectors().iterator(); iter.hasNext();) { > - TransportConnector connector = iter.next(); > - connector.setBrokerService(this); > - al.add(startTransportConnector(connector)); > - } > - if (al.size() > 0) { > - // let's clear the transportConnectors list and replace > it with > - // the started transportConnector instances > - this.transportConnectors.clear(); > - setTransportConnectors(al); > - } > - URI uri = getVmConnectorURI(); > - Map<String, String> map = new HashMap<String, > String>(URISupport.parseParameters(uri)); > - map.put("network", "true"); > - map.put("async", "false"); > - uri = URISupport.createURIWithQuery(uri, > URISupport.createQueryString(map)); > - > - if (!stopped.get()) { > - ThreadPoolExecutor networkConnectorStartExecutor = null; > - if (isNetworkConnectorStartAsync()) { > - // spin up as many threads as needed > - networkConnectorStartExecutor = new > ThreadPoolExecutor(0, Integer.MAX_VALUE, > - 10, TimeUnit.SECONDS, new > SynchronousQueue<Runnable>(), > - new ThreadFactory() { > - int count=0; > - public Thread newThread(Runnable > runnable) { > - Thread thread = new Thread(runnable, > "NetworkConnector Start Thread-" +(count++)); > - thread.setDaemon(true); > - return thread; > - } > - }); > - } > + Set<ActiveMQDestination> durableDestinations = > getBroker().getDurableDestinations(); > + List<TransportConnector> al = new ArrayList<TransportConnector>(); > + for (Iterator<TransportConnector> iter = > getTransportConnectors().iterator(); iter.hasNext();) { > + TransportConnector connector = iter.next(); > + connector.setBrokerService(this); > + al.add(startTransportConnector(connector)); > + } > + if (al.size() > 0) { > + // let's clear the transportConnectors list and replace it > with > + // the started transportConnector instances > + this.transportConnectors.clear(); > + setTransportConnectors(al); > + } > + URI uri = getVmConnectorURI(); > + Map<String, String> map = new HashMap<String, > String>(URISupport.parseParameters(uri)); > + map.put("network", "true"); > + map.put("async", "false"); > + uri = URISupport.createURIWithQuery(uri, > URISupport.createQueryString(map)); > > - for (Iterator<NetworkConnector> iter = > getNetworkConnectors().iterator(); iter.hasNext();) { > - final NetworkConnector connector = iter.next(); > - connector.setLocalUri(uri); > - connector.setBrokerName(getBrokerName()); > - connector.setDurableDestinations(durableDestinations); > - if (getDefaultSocketURIString() != null) { > - > connector.setBrokerURL(getDefaultSocketURIString()); > - } > - if (networkConnectorStartExecutor != null) { > - networkConnectorStartExecutor.execute(new > Runnable() { > - public void run() { > - try { > - LOG.info("Async start of " + > connector); > - connector.start(); > - } catch(Exception e) { > - LOG.error("Async start of network > connector: " + connector + " failed", e); > - } > + if (!stopped.get()) { > + ThreadPoolExecutor networkConnectorStartExecutor = null; > + if (isNetworkConnectorStartAsync()) { > + // spin up as many threads as needed > + networkConnectorStartExecutor = new ThreadPoolExecutor(0, > Integer.MAX_VALUE, > + 10, TimeUnit.SECONDS, new > SynchronousQueue<Runnable>(), > + new ThreadFactory() { > + int count=0; > + public Thread newThread(Runnable runnable) { > + Thread thread = new Thread(runnable, > "NetworkConnector Start Thread-" +(count++)); > + thread.setDaemon(true); > + return thread; > } > }); > - } else { > - connector.start(); > - } > - } > - if (networkConnectorStartExecutor != null) { > - // executor done when enqueued tasks are complete > - > ThreadPoolUtils.shutdown(networkConnectorStartExecutor); > - } > + } > > - for (Iterator<ProxyConnector> iter = > getProxyConnectors().iterator(); iter.hasNext();) { > - ProxyConnector connector = iter.next(); > - connector.start(); > + for (Iterator<NetworkConnector> iter = > getNetworkConnectors().iterator(); iter.hasNext();) { > + final NetworkConnector connector = iter.next(); > + connector.setLocalUri(uri); > + connector.setBrokerName(getBrokerName()); > + connector.setDurableDestinations(durableDestinations); > + if (getDefaultSocketURIString() != null) { > + connector.setBrokerURL(getDefaultSocketURIString()); > } > - for (Iterator<JmsConnector> iter = > jmsConnectors.iterator(); iter.hasNext();) { > - JmsConnector connector = iter.next(); > + if (networkConnectorStartExecutor != null) { > + networkConnectorStartExecutor.execute(new Runnable() { > + public void run() { > + try { > + LOG.info("Async start of " + connector); > + connector.start(); > + } catch(Exception e) { > + LOG.error("Async start of network > connector: " + connector + " failed", e); > + } > + } > + }); > + } else { > connector.start(); > } > - for (Service service : services) { > - configureService(service); > - service.start(); > - } > + } > + if (networkConnectorStartExecutor != null) { > + // executor done when enqueued tasks are complete > + ThreadPoolUtils.shutdown(networkConnectorStartExecutor); > + } > + > + for (Iterator<ProxyConnector> iter = > getProxyConnectors().iterator(); iter.hasNext();) { > + ProxyConnector connector = iter.next(); > + connector.start(); > + } > + for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); > iter.hasNext();) { > + JmsConnector connector = iter.next(); > + connector.start(); > + } > + for (Service service : services) { > + configureService(service); > + service.start(); > } > } > } > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java > Mon Nov 26 21:13:25 2012 > @@ -296,13 +296,6 @@ public class ConnectionContext { > } > > /** > - * @return the slave > - */ > - public boolean isSlave() { > - return (this.broker != null && > this.broker.getBrokerService().isSlave()) || !this.clientMaster; > - } > - > - /** > * @return the clientMaster > */ > public boolean isClientMaster() { > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java > Mon Nov 26 21:13:25 2012 > @@ -186,10 +186,6 @@ public class BrokerView implements Broke > return brokerService.isPersistent(); > } > > - public boolean isSlave() { > - return brokerService.isSlave(); > - } > - > public void terminateJVM(int exitCode) { > System.exit(exitCode); > } > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java > Mon Nov 26 21:13:25 2012 > @@ -115,9 +115,6 @@ public interface BrokerViewMBean extends > @MBeanInfo("Messages are synchronized to disk.") > boolean isPersistent(); > > - @MBeanInfo("Slave broker.") > - boolean isSlave(); > - > /** > * Shuts down the JVM. > * > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java > Mon Nov 26 21:13:25 2012 > @@ -116,10 +116,6 @@ public abstract class AbstractSubscripti > public void gc() { > } > > - public boolean isSlave() { > - return broker.getBrokerService().isSlave(); > - } > - > public ConnectionContext getContext() { > return context; > } > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java > Mon Nov 26 21:13:25 2012 > @@ -92,7 +92,7 @@ public abstract class PrefetchSubscripti > // The slave should not deliver pull messages. > // TODO: when the slave becomes a master, He should send a NULL > message to all the > // consumers to 'wake them up' in case they were waiting for a > message. > - if (getPrefetchSize() == 0 && !isSlave()) { > + if (getPrefetchSize() == 0) { > > prefetchExtension.incrementAndGet(); > final long dispatchCounterBeforePull = dispatchCounter; > @@ -194,13 +194,12 @@ public abstract class PrefetchSubscripti > boolean callDispatchMatched = false; > Destination destination = null; > > - if (!isSlave()) { > - if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) > { > - // suppress unexpected ack exception in this expected case > - LOG.warn("Ignoring ack received before dispatch; result > of failover with an outstanding ack. Acked messages will be replayed if > present on this broker. Ignored ack: " + ack); > - return; > - } > + if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) { > + // suppress unexpected ack exception in this expected case > + LOG.warn("Ignoring ack received before dispatch; result of > failover with an outstanding ack. Acked messages will be replayed if > present on this broker. Ignored ack: " + ack); > + return; > } > + > if (LOG.isTraceEnabled()) { > LOG.trace("ack:" + ack); > } > @@ -413,15 +412,8 @@ public abstract class PrefetchSubscripti > destination.wakeup(); > dispatchPending(); > } else { > - if (isSlave()) { > - throw new JMSException( > - "Slave broker out of sync with master: > Acknowledgment (" > - + ack + ") was not in the dispatch list: " > - + dispatched); > - } else { > - LOG.debug("Acknowledgment out of sync (Normally occurs > when failover connection reconnects): " > - + ack); > - } > + LOG.debug("Acknowledgment out of sync (Normally occurs when > failover connection reconnects): " > + + ack); > } > } > > @@ -447,11 +439,7 @@ public abstract class PrefetchSubscripti > @Override > public void afterRollback() throws Exception { > synchronized(dispatchLock) { > - if (isSlave()) { > - > > ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement(); > - } else { > - // poisionAck will decrement - otherwise > still inflight on client > - } > + // poisionAck will decrement - otherwise > still inflight on client > } > } > }); > @@ -617,53 +605,51 @@ public abstract class PrefetchSubscripti > } > > protected void dispatchPending() throws IOException { > - if (!isSlave()) { > - synchronized(pendingLock) { > - try { > - int numberToDispatch = countBeforeFull(); > - if (numberToDispatch > 0) { > - setSlowConsumer(false); > - setPendingBatchSize(pending, numberToDispatch); > - int count = 0; > - pending.reset(); > - while (pending.hasNext() && !isFull() > - && count < numberToDispatch) { > - MessageReference node = pending.next(); > - if (node == null) { > - break; > - } > + synchronized(pendingLock) { > + try { > + int numberToDispatch = countBeforeFull(); > + if (numberToDispatch > 0) { > + setSlowConsumer(false); > + setPendingBatchSize(pending, numberToDispatch); > + int count = 0; > + pending.reset(); > + while (pending.hasNext() && !isFull() > + && count < numberToDispatch) { > + MessageReference node = pending.next(); > + if (node == null) { > + break; > + } > + > + // Synchronize between dispatched list and remove > of message from pending list > + // related to remove subscription action > + synchronized(dispatchLock) { > + pending.remove(); > + node.decrementReferenceCount(); > + if( !isDropped(node) && canDispatch(node)) { > > - // Synchronize between dispatched list and > remove of message from pending list > - // related to remove subscription action > - synchronized(dispatchLock) { > - pending.remove(); > - node.decrementReferenceCount(); > - if( !isDropped(node) && > canDispatch(node)) { > - > - // Message may have been sitting in > the pending > - // list a while waiting for the > consumer to ak the message. > - if > (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { > - //increment number to dispatch > - numberToDispatch++; > - if (broker.isExpired(node)) { > - > ((Destination)node.getRegionDestination()).messageExpired(context, this, > node); > - } > - continue; > + // Message may have been sitting in the > pending > + // list a while waiting for the consumer > to ak the message. > + if > (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { > + //increment number to dispatch > + numberToDispatch++; > + if (broker.isExpired(node)) { > + > ((Destination)node.getRegionDestination()).messageExpired(context, this, > node); > } > - dispatch(node); > - count++; > + continue; > } > + dispatch(node); > + count++; > } > } > - } else if (!isSlowConsumer()) { > - setSlowConsumer(true); > - for (Destination dest :destinations) { > - dest.slowConsumer(context, this); > - } > } > - } finally { > - pending.release(); > + } else if (!isSlowConsumer()) { > + setSlowConsumer(true); > + for (Destination dest :destinations) { > + dest.slowConsumer(context, this); > + } > } > + } finally { > + pending.release(); > } > } > } > @@ -682,42 +668,37 @@ public abstract class PrefetchSubscripti > okForAckAsDispatchDone.countDown(); > > // No reentrant lock - Patch needed to IndirectMessageReference > on method lock > - if (!isSlave()) { > - > - MessageDispatch md = createMessageDispatch(node, message); > - // NULL messages don't count... they don't get Acked. > - if (node != QueueMessageReference.NULL_MESSAGE) { > - dispatchCounter++; > - dispatched.add(node); > - } else { > - while (true) { > - int currentExtension = prefetchExtension.get(); > - int newExtension = Math.max(0, currentExtension - 1); > - if (prefetchExtension.compareAndSet(currentExtension, > newExtension)) { > - break; > - } > + MessageDispatch md = createMessageDispatch(node, message); > + // NULL messages don't count... they don't get Acked. > + if (node != QueueMessageReference.NULL_MESSAGE) { > + dispatchCounter++; > + dispatched.add(node); > + } else { > + while (true) { > + int currentExtension = prefetchExtension.get(); > + int newExtension = Math.max(0, currentExtension - 1); > + if (prefetchExtension.compareAndSet(currentExtension, > newExtension)) { > + break; > } > } > - if (info.isDispatchAsync()) { > - md.setTransmitCallback(new Runnable() { > + } > + if (info.isDispatchAsync()) { > + md.setTransmitCallback(new Runnable() { > > - public void run() { > - // Since the message gets queued up in async > dispatch, > - // we don't want to > - // decrease the reference count until it gets put > on the > - // wire. > - onDispatch(node, message); > - } > - }); > - context.getConnection().dispatchAsync(md); > - } else { > - context.getConnection().dispatchSync(md); > - onDispatch(node, message); > - } > - return true; > + public void run() { > + // Since the message gets queued up in async dispatch, > + // we don't want to > + // decrease the reference count until it gets put on > the > + // wire. > + onDispatch(node, message); > + } > + }); > + context.getConnection().dispatchAsync(md); > } else { > - return false; > + context.getConnection().dispatchSync(md); > + onDispatch(node, message); > } > + return true; > } > > protected void onDispatch(final MessageReference node, final Message > message) { > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java > Mon Nov 26 21:13:25 2012 > @@ -471,13 +471,13 @@ public class Queue extends BaseDestinati > browserDispatches.add(browserDispatch); > } > > - if (!(this.optimizedDispatch || isSlave())) { > + if (!this.optimizedDispatch) { > wakeup(); > } > }finally { > pagedInPendingDispatchLock.writeLock().unlock(); > } > - if (this.optimizedDispatch || isSlave()) { > + if (this.optimizedDispatch) { > // Outside of dispatchLock() to maintain the lock hierarchy of > // iteratingMutex -> dispatchLock. - see > // https://issues.apache.org/activemq/browse/AMQ-1878 > @@ -578,13 +578,13 @@ public class Queue extends BaseDestinati > }finally { > consumersLock.writeLock().unlock(); > } > - if (!(this.optimizedDispatch || isSlave())) { > + if (!this.optimizedDispatch) { > wakeup(); > } > }finally { > pagedInPendingDispatchLock.writeLock().unlock(); > } > - if (this.optimizedDispatch || isSlave()) { > + if (this.optimizedDispatch) { > // Outside of dispatchLock() to maintain the lock hierarchy of > // iteratingMutex -> dispatchLock. - see > // https://issues.apache.org/activemq/browse/AMQ-1878 > @@ -1704,7 +1704,7 @@ public class Queue extends BaseDestinati > } > > public void wakeup() { > - if ((optimizedDispatch || isSlave()) && !iterationRunning) { > + if (optimizedDispatch && !iterationRunning) { > iterate(); > pendingWakeups.incrementAndGet(); > } else { > @@ -1721,10 +1721,6 @@ public class Queue extends BaseDestinati > } > } > > - private boolean isSlave() { > - return broker.getBrokerService().isSlave(); > - } > - > private void doPageIn(boolean force) throws Exception { > PendingList newlyPaged = doPageInForDispatch(force); > pagedInPendingDispatchLock.writeLock().lock(); > @@ -1875,7 +1871,7 @@ public class Queue extends BaseDestinati > consumersLock.writeLock().lock(); > > try { > - if (this.consumers.isEmpty() || isSlave()) { > + if (this.consumers.isEmpty()) { > // slave dispatch happens in processDispatchNotification > return list; > } > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java > Mon Nov 26 21:13:25 2012 > @@ -695,10 +695,6 @@ public class RegionBroker extends EmptyB > } > } > > - public boolean isSlaveBroker() { > - return brokerService.isSlave(); > - } > - > @Override > public boolean isStopped() { > return !started; > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java > Mon Nov 26 21:13:25 2012 > @@ -109,12 +109,7 @@ public interface Subscription extends Su > * @throws Exception > */ > void processMessageDispatchNotification(MessageDispatchNotification > mdn) throws Exception; > - > - /** > - * @return true if the broker is currently in slave mode > - */ > - boolean isSlave(); > - > + > /** > * @return number of messages pending delivery > */ > > Modified: > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java > (original) > +++ > activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java > Mon Nov 26 21:13:25 2012 > @@ -101,7 +101,7 @@ public class TopicSubscription extends A > return; > } > enqueueCounter.incrementAndGet(); > - if (!isFull() && matched.isEmpty() && !isSlave()) { > + if (!isFull() && matched.isEmpty()) { > // if maximumPendingMessages is set we will only discard > messages which > // have not been dispatched (i.e. we allow the prefetch > buffer to be filled) > dispatch(node); > @@ -299,7 +299,7 @@ public class TopicSubscription extends A > public Response pullMessage(ConnectionContext context, MessagePull > pull) throws Exception { > > // The slave should not deliver pull messages. > - if (getPrefetchSize() == 0 && !isSlave()) { > + if (getPrefetchSize() == 0 ) { > > prefetchWindowOpen.set(true); > dispatchMatched(); > > Modified: > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java > (original) > +++ > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java > Mon Nov 26 21:13:25 2012 > @@ -338,7 +338,6 @@ public class MBeanTest extends EmbeddedB > ObjectName brokerName = assertRegisteredObjectName(domain + > ":Type=Broker,BrokerName=localhost"); > BrokerViewMBean broker = > (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, > brokerName, BrokerViewMBean.class, true); > > - assertTrue("broker is not a slave", !broker.isSlave()); > // create 2 topics > broker.addTopic(getDestinationString() + "1 "); > broker.addTopic(" " + getDestinationString() + "2"); > @@ -536,7 +535,6 @@ public class MBeanTest extends EmbeddedB > ObjectName brokerName = assertRegisteredObjectName(domain + > ":Type=Broker,BrokerName=localhost"); > BrokerViewMBean broker = > (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, > brokerName, BrokerViewMBean.class, true); > > - assertTrue("broker is not a slave", !broker.isSlave()); > // create 2 topics > broker.addTopic(getDestinationString() + "1"); > broker.addTopic(getDestinationString() + "2"); > @@ -588,7 +586,6 @@ public class MBeanTest extends EmbeddedB > ObjectName brokerName = assertRegisteredObjectName(domain + > ":Type=Broker,BrokerName=localhost"); > BrokerViewMBean broker = > (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, > brokerName, BrokerViewMBean.class, true); > > - assertTrue("broker is not a slave", !broker.isSlave()); > // create 2 topics > broker.addTopic(getDestinationString() + "1"); > broker.addTopic(getDestinationString() + "2"); > @@ -797,7 +794,6 @@ public class MBeanTest extends EmbeddedB > ObjectName brokerName = assertRegisteredObjectName(domain + > ":Type=Broker,BrokerName=localhost"); > BrokerViewMBean broker = > (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, > brokerName, BrokerViewMBean.class, true); > > - assertTrue("broker is not a slave", !broker.isSlave()); > assertEquals(0, broker.getDynamicDestinationProducers().length); > > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > > Modified: > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java > (original) > +++ > activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java > Mon Nov 26 21:13:25 2012 > @@ -234,10 +234,6 @@ public class QueueDuplicatesFromStoreTes > return false; > } > > - public boolean isSlave() { > - return false; > - } > - > public boolean matches(MessageReference node, > MessageEvaluationContext context) throws IOException { > return true; > > Modified: > activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java > (original) > +++ > activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java > Mon Nov 26 21:13:25 2012 > @@ -66,7 +66,6 @@ public class ApplicationContextFilter im > private String applicationContextName = "applicationContext"; > private String requestContextName = "requestContext"; > private String requestName = "request"; > - private final String slavePage = "slave.jsp"; > > public void init(FilterConfig config) throws ServletException { > this.servletContext = config.getServletContext(); > @@ -85,19 +84,19 @@ public class ApplicationContextFilter im > Map requestContextWrapper = createRequestContextWrapper(request); > String path = ((HttpServletRequest)request).getRequestURI(); > // handle slave brokers > - try { > - if ( !(path.endsWith("css") || path.endsWith("png") || > path.endsWith("ico") || path.endsWith(slavePage)) > - && > ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) { > - ((HttpServletResponse)response).sendRedirect(slavePage); > - return; > - } > - } catch (Exception e) { > - LOG.warn(path + ", failed to access BrokerFacade: reason: " + > e.getLocalizedMessage()); > - if (LOG.isDebugEnabled()) { > - LOG.debug(request.toString(), e); > - } > - throw new IOException(e); > - } > +// try { > +// if ( !(path.endsWith("css") || path.endsWith("png") || > path.endsWith("ico") || path.endsWith(slavePage)) > +// && > ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) { > +// ((HttpServletResponse)response).sendRedirect(slavePage); > +// return; > +// } > +// } catch (Exception e) { > +// LOG.warn(path + ", failed to access BrokerFacade: reason: " > + e.getLocalizedMessage()); > +// if (LOG.isDebugEnabled()) { > +// LOG.debug(request.toString(), e); > +// } > +// throw new IOException(e); > +// } > request.setAttribute(requestContextName, requestContextWrapper); > request.setAttribute(requestName, request); > chain.doFilter(request, response); > > Modified: > activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java > (original) > +++ > activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java > Mon Nov 26 21:13:25 2012 > @@ -209,6 +209,4 @@ public interface BrokerFacade { > > boolean isJobSchedulerStarted(); > > - boolean isSlave() throws Exception; > - > } > \ No newline at end of file > > Modified: > activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java > URL: > http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java?rev=1413846&r1=1413845&r2=1413846&view=diff > > ============================================================================== > --- > activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java > (original) > +++ > activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java > Mon Nov 26 21:13:25 2012 > @@ -226,8 +226,4 @@ public abstract class BrokerFacadeSuppor > return false; > } > } > - > - public boolean isSlave() throws Exception { > - return getBrokerAdmin().isSlave(); > - } > } > > > -- http://redhat.com http://blog.garytully.com
