Author: chirino
Date: Mon Nov 26 21:13:25 2012
New Revision: 1413846
URL: http://svn.apache.org/viewvc?rev=1413846&view=rev
Log:
Changes for https://issues.apache.org/jira/browse/AMQ-4165 : Remove pure
master/slave functionality
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.java
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
Mon Nov 26 21:13:25 2012
@@ -168,7 +168,6 @@ public class BrokerService implements Se
private File schedulerDirectoryFile;
private Scheduler scheduler;
private ThreadPoolExecutor executor;
- private boolean slave = true;
private int schedulePeriodForDestinationPurge= 0;
private int maxPurgedDestinationsPerSweep = 0;
private BrokerContext brokerContext;
@@ -392,13 +391,6 @@ public class BrokerService implements Se
return null;
}
- /**
- * @return true if this Broker is a slave to a Master
- */
- public boolean isSlave() {
- return slave;
- }
-
public void masterFailed() {
if (shutdownOnMasterFailure) {
LOG.error("The Master has failed ... shutting down");
@@ -578,7 +570,6 @@ public class BrokerService implements Se
if (startException != null) {
return;
}
- slave = false;
startDestinations();
addShutdownHook();
@@ -604,9 +595,7 @@ public class BrokerService implements Se
adminView.setBroker(managedBroker);
}
- if (!isSlave()) {
- startAllConnectors();
- }
+ startAllConnectors();
if (ioExceptionHandler == null) {
setIoExceptionHandler(new DefaultIOExceptionHandler());
@@ -680,7 +669,6 @@ public class BrokerService implements Se
try {
stopper.stop(persistenceAdapter);
persistenceAdapter = null;
- slave = true;
if (isUseJmx()) {
stopper.stop(getManagementContext());
managementContext = null;
@@ -1227,8 +1215,7 @@ public class BrokerService implements Se
}
/**
- * Sets the services associated with this broker such as a
- * {@link MasterConnector}
+ * Sets the services associated with this broker.
*/
public void setServices(Service[] services) {
this.services.clear();
@@ -2246,82 +2233,80 @@ public class BrokerService implements Se
* @throws Exception
*/
public void startAllConnectors() throws Exception {
- if (!isSlave()) {
- Set<ActiveMQDestination> durableDestinations =
getBroker().getDurableDestinations();
- List<TransportConnector> al = new ArrayList<TransportConnector>();
- for (Iterator<TransportConnector> iter =
getTransportConnectors().iterator(); iter.hasNext();) {
- TransportConnector connector = iter.next();
- connector.setBrokerService(this);
- al.add(startTransportConnector(connector));
- }
- if (al.size() > 0) {
- // let's clear the transportConnectors list and replace it with
- // the started transportConnector instances
- this.transportConnectors.clear();
- setTransportConnectors(al);
- }
- URI uri = getVmConnectorURI();
- Map<String, String> map = new HashMap<String,
String>(URISupport.parseParameters(uri));
- map.put("network", "true");
- map.put("async", "false");
- uri = URISupport.createURIWithQuery(uri,
URISupport.createQueryString(map));
-
- if (!stopped.get()) {
- ThreadPoolExecutor networkConnectorStartExecutor = null;
- if (isNetworkConnectorStartAsync()) {
- // spin up as many threads as needed
- networkConnectorStartExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
- 10, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>(),
- new ThreadFactory() {
- int count=0;
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable,
"NetworkConnector Start Thread-" +(count++));
- thread.setDaemon(true);
- return thread;
- }
- });
- }
+ Set<ActiveMQDestination> durableDestinations =
getBroker().getDurableDestinations();
+ List<TransportConnector> al = new ArrayList<TransportConnector>();
+ for (Iterator<TransportConnector> iter =
getTransportConnectors().iterator(); iter.hasNext();) {
+ TransportConnector connector = iter.next();
+ connector.setBrokerService(this);
+ al.add(startTransportConnector(connector));
+ }
+ if (al.size() > 0) {
+ // let's clear the transportConnectors list and replace it with
+ // the started transportConnector instances
+ this.transportConnectors.clear();
+ setTransportConnectors(al);
+ }
+ URI uri = getVmConnectorURI();
+ Map<String, String> map = new HashMap<String,
String>(URISupport.parseParameters(uri));
+ map.put("network", "true");
+ map.put("async", "false");
+ uri = URISupport.createURIWithQuery(uri,
URISupport.createQueryString(map));
- for (Iterator<NetworkConnector> iter =
getNetworkConnectors().iterator(); iter.hasNext();) {
- final NetworkConnector connector = iter.next();
- connector.setLocalUri(uri);
- connector.setBrokerName(getBrokerName());
- connector.setDurableDestinations(durableDestinations);
- if (getDefaultSocketURIString() != null) {
- connector.setBrokerURL(getDefaultSocketURIString());
- }
- if (networkConnectorStartExecutor != null) {
- networkConnectorStartExecutor.execute(new Runnable() {
- public void run() {
- try {
- LOG.info("Async start of " + connector);
- connector.start();
- } catch(Exception e) {
- LOG.error("Async start of network
connector: " + connector + " failed", e);
- }
+ if (!stopped.get()) {
+ ThreadPoolExecutor networkConnectorStartExecutor = null;
+ if (isNetworkConnectorStartAsync()) {
+ // spin up as many threads as needed
+ networkConnectorStartExecutor = new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
+ 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new ThreadFactory() {
+ int count=0;
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable,
"NetworkConnector Start Thread-" +(count++));
+ thread.setDaemon(true);
+ return thread;
}
});
- } else {
- connector.start();
- }
- }
- if (networkConnectorStartExecutor != null) {
- // executor done when enqueued tasks are complete
- ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
- }
+ }
- for (Iterator<ProxyConnector> iter =
getProxyConnectors().iterator(); iter.hasNext();) {
- ProxyConnector connector = iter.next();
- connector.start();
+ for (Iterator<NetworkConnector> iter =
getNetworkConnectors().iterator(); iter.hasNext();) {
+ final NetworkConnector connector = iter.next();
+ connector.setLocalUri(uri);
+ connector.setBrokerName(getBrokerName());
+ connector.setDurableDestinations(durableDestinations);
+ if (getDefaultSocketURIString() != null) {
+ connector.setBrokerURL(getDefaultSocketURIString());
}
- for (Iterator<JmsConnector> iter = jmsConnectors.iterator();
iter.hasNext();) {
- JmsConnector connector = iter.next();
+ if (networkConnectorStartExecutor != null) {
+ networkConnectorStartExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ LOG.info("Async start of " + connector);
+ connector.start();
+ } catch(Exception e) {
+ LOG.error("Async start of network connector: "
+ connector + " failed", e);
+ }
+ }
+ });
+ } else {
connector.start();
}
- for (Service service : services) {
- configureService(service);
- service.start();
- }
+ }
+ if (networkConnectorStartExecutor != null) {
+ // executor done when enqueued tasks are complete
+ ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
+ }
+
+ for (Iterator<ProxyConnector> iter =
getProxyConnectors().iterator(); iter.hasNext();) {
+ ProxyConnector connector = iter.next();
+ connector.start();
+ }
+ for (Iterator<JmsConnector> iter = jmsConnectors.iterator();
iter.hasNext();) {
+ JmsConnector connector = iter.next();
+ connector.start();
+ }
+ for (Service service : services) {
+ configureService(service);
+ service.start();
}
}
}
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Mon Nov 26 21:13:25 2012
@@ -296,13 +296,6 @@ public class ConnectionContext {
}
/**
- * @return the slave
- */
- public boolean isSlave() {
- return (this.broker != null &&
this.broker.getBrokerService().isSlave()) || !this.clientMaster;
- }
-
- /**
* @return the clientMaster
*/
public boolean isClientMaster() {
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Mon Nov 26 21:13:25 2012
@@ -186,10 +186,6 @@ public class BrokerView implements Broke
return brokerService.isPersistent();
}
- public boolean isSlave() {
- return brokerService.isSlave();
- }
-
public void terminateJVM(int exitCode) {
System.exit(exitCode);
}
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
Mon Nov 26 21:13:25 2012
@@ -115,9 +115,6 @@ public interface BrokerViewMBean extends
@MBeanInfo("Messages are synchronized to disk.")
boolean isPersistent();
- @MBeanInfo("Slave broker.")
- boolean isSlave();
-
/**
* Shuts down the JVM.
*
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Mon Nov 26 21:13:25 2012
@@ -116,10 +116,6 @@ public abstract class AbstractSubscripti
public void gc() {
}
- public boolean isSlave() {
- return broker.getBrokerService().isSlave();
- }
-
public ConnectionContext getContext() {
return context;
}
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon Nov 26 21:13:25 2012
@@ -92,7 +92,7 @@ public abstract class PrefetchSubscripti
// The slave should not deliver pull messages.
// TODO: when the slave becomes a master, He should send a NULL
message to all the
// consumers to 'wake them up' in case they were waiting for a message.
- if (getPrefetchSize() == 0 && !isSlave()) {
+ if (getPrefetchSize() == 0) {
prefetchExtension.incrementAndGet();
final long dispatchCounterBeforePull = dispatchCounter;
@@ -194,13 +194,12 @@ public abstract class PrefetchSubscripti
boolean callDispatchMatched = false;
Destination destination = null;
- if (!isSlave()) {
- if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
- // suppress unexpected ack exception in this expected case
- LOG.warn("Ignoring ack received before dispatch; result of
failover with an outstanding ack. Acked messages will be replayed if present on
this broker. Ignored ack: " + ack);
- return;
- }
+ if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
+ // suppress unexpected ack exception in this expected case
+ LOG.warn("Ignoring ack received before dispatch; result of
failover with an outstanding ack. Acked messages will be replayed if present on
this broker. Ignored ack: " + ack);
+ return;
}
+
if (LOG.isTraceEnabled()) {
LOG.trace("ack:" + ack);
}
@@ -413,15 +412,8 @@ public abstract class PrefetchSubscripti
destination.wakeup();
dispatchPending();
} else {
- if (isSlave()) {
- throw new JMSException(
- "Slave broker out of sync with master: Acknowledgment
("
- + ack + ") was not in the dispatch list: "
- + dispatched);
- } else {
- LOG.debug("Acknowledgment out of sync (Normally occurs when
failover connection reconnects): "
- + ack);
- }
+ LOG.debug("Acknowledgment out of sync (Normally occurs when
failover connection reconnects): "
+ + ack);
}
}
@@ -447,11 +439,7 @@ public abstract class PrefetchSubscripti
@Override
public void afterRollback() throws Exception {
synchronized(dispatchLock) {
- if (isSlave()) {
-
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
- } else {
- // poisionAck will decrement - otherwise still
inflight on client
- }
+ // poisionAck will decrement - otherwise still
inflight on client
}
}
});
@@ -617,53 +605,51 @@ public abstract class PrefetchSubscripti
}
protected void dispatchPending() throws IOException {
- if (!isSlave()) {
- synchronized(pendingLock) {
- try {
- int numberToDispatch = countBeforeFull();
- if (numberToDispatch > 0) {
- setSlowConsumer(false);
- setPendingBatchSize(pending, numberToDispatch);
- int count = 0;
- pending.reset();
- while (pending.hasNext() && !isFull()
- && count < numberToDispatch) {
- MessageReference node = pending.next();
- if (node == null) {
- break;
- }
+ synchronized(pendingLock) {
+ try {
+ int numberToDispatch = countBeforeFull();
+ if (numberToDispatch > 0) {
+ setSlowConsumer(false);
+ setPendingBatchSize(pending, numberToDispatch);
+ int count = 0;
+ pending.reset();
+ while (pending.hasNext() && !isFull()
+ && count < numberToDispatch) {
+ MessageReference node = pending.next();
+ if (node == null) {
+ break;
+ }
+
+ // Synchronize between dispatched list and remove of
message from pending list
+ // related to remove subscription action
+ synchronized(dispatchLock) {
+ pending.remove();
+ node.decrementReferenceCount();
+ if( !isDropped(node) && canDispatch(node)) {
- // Synchronize between dispatched list and remove
of message from pending list
- // related to remove subscription action
- synchronized(dispatchLock) {
- pending.remove();
- node.decrementReferenceCount();
- if( !isDropped(node) && canDispatch(node)) {
-
- // Message may have been sitting in the
pending
- // list a while waiting for the consumer
to ak the message.
- if
(node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
- //increment number to dispatch
- numberToDispatch++;
- if (broker.isExpired(node)) {
-
((Destination)node.getRegionDestination()).messageExpired(context, this, node);
- }
- continue;
+ // Message may have been sitting in the pending
+ // list a while waiting for the consumer to ak
the message.
+ if (node!=QueueMessageReference.NULL_MESSAGE
&& node.isExpired()) {
+ //increment number to dispatch
+ numberToDispatch++;
+ if (broker.isExpired(node)) {
+
((Destination)node.getRegionDestination()).messageExpired(context, this, node);
}
- dispatch(node);
- count++;
+ continue;
}
+ dispatch(node);
+ count++;
}
}
- } else if (!isSlowConsumer()) {
- setSlowConsumer(true);
- for (Destination dest :destinations) {
- dest.slowConsumer(context, this);
- }
}
- } finally {
- pending.release();
+ } else if (!isSlowConsumer()) {
+ setSlowConsumer(true);
+ for (Destination dest :destinations) {
+ dest.slowConsumer(context, this);
+ }
}
+ } finally {
+ pending.release();
}
}
}
@@ -682,42 +668,37 @@ public abstract class PrefetchSubscripti
okForAckAsDispatchDone.countDown();
// No reentrant lock - Patch needed to IndirectMessageReference on
method lock
- if (!isSlave()) {
-
- MessageDispatch md = createMessageDispatch(node, message);
- // NULL messages don't count... they don't get Acked.
- if (node != QueueMessageReference.NULL_MESSAGE) {
- dispatchCounter++;
- dispatched.add(node);
- } else {
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - 1);
- if (prefetchExtension.compareAndSet(currentExtension,
newExtension)) {
- break;
- }
+ MessageDispatch md = createMessageDispatch(node, message);
+ // NULL messages don't count... they don't get Acked.
+ if (node != QueueMessageReference.NULL_MESSAGE) {
+ dispatchCounter++;
+ dispatched.add(node);
+ } else {
+ while (true) {
+ int currentExtension = prefetchExtension.get();
+ int newExtension = Math.max(0, currentExtension - 1);
+ if (prefetchExtension.compareAndSet(currentExtension,
newExtension)) {
+ break;
}
}
- if (info.isDispatchAsync()) {
- md.setTransmitCallback(new Runnable() {
+ }
+ if (info.isDispatchAsync()) {
+ md.setTransmitCallback(new Runnable() {
- public void run() {
- // Since the message gets queued up in async dispatch,
- // we don't want to
- // decrease the reference count until it gets put on
the
- // wire.
- onDispatch(node, message);
- }
- });
- context.getConnection().dispatchAsync(md);
- } else {
- context.getConnection().dispatchSync(md);
- onDispatch(node, message);
- }
- return true;
+ public void run() {
+ // Since the message gets queued up in async dispatch,
+ // we don't want to
+ // decrease the reference count until it gets put on the
+ // wire.
+ onDispatch(node, message);
+ }
+ });
+ context.getConnection().dispatchAsync(md);
} else {
- return false;
+ context.getConnection().dispatchSync(md);
+ onDispatch(node, message);
}
+ return true;
}
protected void onDispatch(final MessageReference node, final Message
message) {
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Nov 26 21:13:25 2012
@@ -471,13 +471,13 @@ public class Queue extends BaseDestinati
browserDispatches.add(browserDispatch);
}
- if (!(this.optimizedDispatch || isSlave())) {
+ if (!this.optimizedDispatch) {
wakeup();
}
}finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
- if (this.optimizedDispatch || isSlave()) {
+ if (this.optimizedDispatch) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see
// https://issues.apache.org/activemq/browse/AMQ-1878
@@ -578,13 +578,13 @@ public class Queue extends BaseDestinati
}finally {
consumersLock.writeLock().unlock();
}
- if (!(this.optimizedDispatch || isSlave())) {
+ if (!this.optimizedDispatch) {
wakeup();
}
}finally {
pagedInPendingDispatchLock.writeLock().unlock();
}
- if (this.optimizedDispatch || isSlave()) {
+ if (this.optimizedDispatch) {
// Outside of dispatchLock() to maintain the lock hierarchy of
// iteratingMutex -> dispatchLock. - see
// https://issues.apache.org/activemq/browse/AMQ-1878
@@ -1704,7 +1704,7 @@ public class Queue extends BaseDestinati
}
public void wakeup() {
- if ((optimizedDispatch || isSlave()) && !iterationRunning) {
+ if (optimizedDispatch && !iterationRunning) {
iterate();
pendingWakeups.incrementAndGet();
} else {
@@ -1721,10 +1721,6 @@ public class Queue extends BaseDestinati
}
}
- private boolean isSlave() {
- return broker.getBrokerService().isSlave();
- }
-
private void doPageIn(boolean force) throws Exception {
PendingList newlyPaged = doPageInForDispatch(force);
pagedInPendingDispatchLock.writeLock().lock();
@@ -1875,7 +1871,7 @@ public class Queue extends BaseDestinati
consumersLock.writeLock().lock();
try {
- if (this.consumers.isEmpty() || isSlave()) {
+ if (this.consumers.isEmpty()) {
// slave dispatch happens in processDispatchNotification
return list;
}
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Mon Nov 26 21:13:25 2012
@@ -695,10 +695,6 @@ public class RegionBroker extends EmptyB
}
}
- public boolean isSlaveBroker() {
- return brokerService.isSlave();
- }
-
@Override
public boolean isStopped() {
return !started;
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
Mon Nov 26 21:13:25 2012
@@ -109,12 +109,7 @@ public interface Subscription extends Su
* @throws Exception
*/
void processMessageDispatchNotification(MessageDispatchNotification mdn)
throws Exception;
-
- /**
- * @return true if the broker is currently in slave mode
- */
- boolean isSlave();
-
+
/**
* @return number of messages pending delivery
*/
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon Nov 26 21:13:25 2012
@@ -101,7 +101,7 @@ public class TopicSubscription extends A
return;
}
enqueueCounter.incrementAndGet();
- if (!isFull() && matched.isEmpty() && !isSlave()) {
+ if (!isFull() && matched.isEmpty()) {
// if maximumPendingMessages is set we will only discard messages
which
// have not been dispatched (i.e. we allow the prefetch buffer to
be filled)
dispatch(node);
@@ -299,7 +299,7 @@ public class TopicSubscription extends A
public Response pullMessage(ConnectionContext context, MessagePull pull)
throws Exception {
// The slave should not deliver pull messages.
- if (getPrefetchSize() == 0 && !isSlave()) {
+ if (getPrefetchSize() == 0 ) {
prefetchWindowOpen.set(true);
dispatchMatched();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Mon Nov 26 21:13:25 2012
@@ -338,7 +338,6 @@ public class MBeanTest extends EmbeddedB
ObjectName brokerName = assertRegisteredObjectName(domain +
":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker =
(BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
- assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1 ");
broker.addTopic(" " + getDestinationString() + "2");
@@ -536,7 +535,6 @@ public class MBeanTest extends EmbeddedB
ObjectName brokerName = assertRegisteredObjectName(domain +
":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker =
(BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
- assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1");
broker.addTopic(getDestinationString() + "2");
@@ -588,7 +586,6 @@ public class MBeanTest extends EmbeddedB
ObjectName brokerName = assertRegisteredObjectName(domain +
":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker =
(BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
- assertTrue("broker is not a slave", !broker.isSlave());
// create 2 topics
broker.addTopic(getDestinationString() + "1");
broker.addTopic(getDestinationString() + "2");
@@ -797,7 +794,6 @@ public class MBeanTest extends EmbeddedB
ObjectName brokerName = assertRegisteredObjectName(domain +
":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker =
(BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
brokerName, BrokerViewMBean.class, true);
- assertTrue("broker is not a slave", !broker.isSlave());
assertEquals(0, broker.getDynamicDestinationProducers().length);
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
Mon Nov 26 21:13:25 2012
@@ -234,10 +234,6 @@ public class QueueDuplicatesFromStoreTes
return false;
}
- public boolean isSlave() {
- return false;
- }
-
public boolean matches(MessageReference node,
MessageEvaluationContext context) throws IOException {
return true;
Modified:
activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
(original)
+++
activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
Mon Nov 26 21:13:25 2012
@@ -66,7 +66,6 @@ public class ApplicationContextFilter im
private String applicationContextName = "applicationContext";
private String requestContextName = "requestContext";
private String requestName = "request";
- private final String slavePage = "slave.jsp";
public void init(FilterConfig config) throws ServletException {
this.servletContext = config.getServletContext();
@@ -85,19 +84,19 @@ public class ApplicationContextFilter im
Map requestContextWrapper = createRequestContextWrapper(request);
String path = ((HttpServletRequest)request).getRequestURI();
// handle slave brokers
- try {
- if ( !(path.endsWith("css") || path.endsWith("png") ||
path.endsWith("ico") || path.endsWith(slavePage))
- &&
((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) {
- ((HttpServletResponse)response).sendRedirect(slavePage);
- return;
- }
- } catch (Exception e) {
- LOG.warn(path + ", failed to access BrokerFacade: reason: " +
e.getLocalizedMessage());
- if (LOG.isDebugEnabled()) {
- LOG.debug(request.toString(), e);
- }
- throw new IOException(e);
- }
+// try {
+// if ( !(path.endsWith("css") || path.endsWith("png") ||
path.endsWith("ico") || path.endsWith(slavePage))
+// &&
((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) {
+// ((HttpServletResponse)response).sendRedirect(slavePage);
+// return;
+// }
+// } catch (Exception e) {
+// LOG.warn(path + ", failed to access BrokerFacade: reason: " +
e.getLocalizedMessage());
+// if (LOG.isDebugEnabled()) {
+// LOG.debug(request.toString(), e);
+// }
+// throw new IOException(e);
+// }
request.setAttribute(requestContextName, requestContextWrapper);
request.setAttribute(requestName, request);
chain.doFilter(request, response);
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
Mon Nov 26 21:13:25 2012
@@ -209,6 +209,4 @@ public interface BrokerFacade {
boolean isJobSchedulerStarted();
- boolean isSlave() throws Exception;
-
}
\ No newline at end of file
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java?rev=1413846&r1=1413845&r2=1413846&view=diff
==============================================================================
---
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
(original)
+++
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
Mon Nov 26 21:13:25 2012
@@ -226,8 +226,4 @@ public abstract class BrokerFacadeSuppor
return false;
}
}
-
- public boolean isSlave() throws Exception {
- return getBrokerAdmin().isSlave();
- }
}