Author: rajdavies
Date: Thu Jul 19 12:24:31 2007
New Revision: 557748
URL: http://svn.apache.org/viewvc?view=rev&rev=557748
Log:
move decision about being a slave from the Broker to the ConnectionContext - so
can be done on a Connection basis if required
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
Thu Jul 19 12:24:31 2007
@@ -191,12 +191,7 @@
* @param messageDispatch
*/
public void processDispatch(MessageDispatch messageDispatch);
-
- /**
- * @return true if the broker is running as a slave
- */
- public boolean isSlaveBroker();
-
+
/**
* @return true if the broker has stopped
*/
@@ -229,7 +224,7 @@
* @return true if fault tolerant
*/
public boolean isFaultTolerantConfiguration();
-
+
/**
* @return the connection context used to make administration operations
on startup or via JMX MBeans
*/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Thu Jul 19 12:24:31 2007
@@ -200,11 +200,7 @@
public void processDispatchNotification(MessageDispatchNotification
messageDispatchNotification) throws Exception{
next.processDispatchNotification(messageDispatchNotification);
}
-
- public boolean isSlaveBroker(){
- return next.isSlaveBroker();
- }
-
+
public boolean isStopped(){
return next.isStopped();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Jul 19 12:24:31 2007
@@ -154,6 +154,7 @@
private boolean useLocalHostBrokerName = false;
private CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean supportFailOver = false;
+ private boolean clustered = false;
static{
String localHostName = "localhost";
@@ -1120,6 +1121,20 @@
public void setSupportFailOver(boolean supportFailOver){
this.supportFailOver=supportFailOver;
}
+
+ /**
+ * @return the clustered
+ */
+ public boolean isClustered(){
+ return this.clustered;
+ }
+
+ /**
+ * @param clustered the clustered to set
+ */
+ public void setClustered(boolean clustered){
+ this.clustered=clustered;
+ }
// Implementation methods
//
-------------------------------------------------------------------------
@@ -1697,6 +1712,5 @@
broker.addDestination(adminConnectionContext, destination);
}
}
- }
-
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ConnectionContext.java
Thu Jul 19 12:24:31 2007
@@ -59,6 +59,7 @@
private final AtomicBoolean stopping = new AtomicBoolean();
private final MessageEvaluationContext messageEvaluationContext = new
MessageEvaluationContext();
private boolean dontSendReponse;
+ private boolean clientMaster=true;
public ConnectionContext() {
}
@@ -267,6 +268,29 @@
public boolean isDontSendReponse() {
return dontSendReponse;
- }
-
+ }
+
+
+ /**
+ * @return the slave
+ */
+ public boolean isSlave(){
+ return
(this.broker!=null&&this.broker.getBrokerService().isSlave())||!this.clientMaster;
+ }
+
+
+ /**
+ * @return the clientMaster
+ */
+ public boolean isClientMaster(){
+ return this.clientMaster;
+ }
+
+
+ /**
+ * @param clientMaster the clientMaster to set
+ */
+ public void setClientMaster(boolean clientMaster){
+ this.clientMaster=clientMaster;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Thu Jul 19 12:24:31 2007
@@ -199,10 +199,7 @@
}
- public boolean isSlaveBroker() {
- return false;
- }
-
+
public boolean isStopped() {
return false;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Thu Jul 19 12:24:31 2007
@@ -197,10 +197,7 @@
throw new BrokerStoppedException(this.message);
}
- public boolean isSlaveBroker() {
- throw new BrokerStoppedException(this.message);
- }
-
+
public boolean isStopped() {
return true;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Thu Jul 19 12:24:31 2007
@@ -209,10 +209,7 @@
getNext().processDispatchNotification(messageDispatchNotification);
}
- public boolean isSlaveBroker(){
- return getNext().isSlaveBroker();
- }
-
+
public boolean isStopped(){
return getNext().isStopped();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Thu Jul 19 12:24:31 2007
@@ -658,6 +658,7 @@
context.setClientId(clientId);
context.setUserName(info.getUserName());
context.setConnectionId(info.getConnectionId());
+ context.setClientMaster(info.isClientMaster());
context.setWireFormatInfo(wireFormatInfo);
context.setNetworkConnection(networkConnection);
context.incrementReference();
@@ -1199,18 +1200,19 @@
}
}
- protected void disposeTransport() {
- if( transportDisposed.compareAndSet(false, true) ) {
- try {
- transport.stop();
- active = false;
- log.debug("Stopped connection:
"+transport.getRemoteAddress());
- } catch (Exception e) {
- log.debug("Could not stop transport: "+e,e);
- }
- }
- }
-
+ protected void disposeTransport(){
+ if(transportDisposed.compareAndSet(false,true)){
+ try{
+ transport.stop();
+ active=false;
+ log.debug("Stopped connection: "+transport.getRemoteAddress());
+ }catch(Exception e){
+ log.debug("Could not stop transport: "+e,e);
+ }
+ }
+ }
+
+
public int getProtocolVersion() {
return protocolVersion.get();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
Thu Jul 19 12:24:31 2007
@@ -115,8 +115,8 @@
public void gc() {
}
- public boolean isSlaveBroker(){
- return broker.isSlaveBroker();
+ public boolean isSlave(){
+ return getContext().isSlave();
}
public ConnectionContext getContext() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Jul 19 12:24:31 2007
@@ -74,7 +74,7 @@
// The slave should not deliver pull messages. TODO: when the slave
becomes a master,
// He should send a NULL message to all the consumers to 'wake them
up' in case
// they were waiting for a message.
- if(getPrefetchSize()==0&&!isSlaveBroker()){
+ if(getPrefetchSize()==0&&!isSlave()){
prefetchExtension++;
final long dispatchCounterBeforePull=dispatchCounter;
dispatchMatched();
@@ -119,7 +119,7 @@
pendingEmpty=pending.isEmpty();
enqueueCounter++;
- if(!isFull()&&pendingEmpty&&!broker.isSlaveBroker()){
+ if(!isFull()&&pendingEmpty&&!isSlave()){
dispatch(node);
}else{
optimizePrefetch();
@@ -260,7 +260,7 @@
if(callDispatchMatched){
dispatchMatched();
}else{
- if(isSlaveBroker()){
+ if(isSlave()){
throw new JMSException("Slave broker out of sync with master:
Acknowledgment ("+ack
+") was not in the dispatch list: "+dispatched);
}else{
@@ -295,7 +295,7 @@
* @return
*/
protected synchronized boolean isFull(){
- return
isSlaveBroker()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
+ return
isSlave()||dispatched.size()-prefetchExtension>=info.getPrefetchSize();
}
/**
@@ -377,7 +377,7 @@
}
protected synchronized void dispatchMatched() throws IOException{
- if(!broker.isSlaveBroker()){
+ if(!isSlave()){
try{
int numberToDispatch=countBeforeFull();
if(numberToDispatch>0){
@@ -412,7 +412,7 @@
return false;
}
// Make sure we can dispatch a message.
- if(canDispatch(node)&&!isSlaveBroker()){
+ if(canDispatch(node)&&!isSlave()){
MessageDispatch md=createMessageDispatch(node,message);
// NULL messages don't count... they don't get Acked.
if(node!=QueueMessageReference.NULL_MESSAGE){
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Thu Jul 19 12:24:31 2007
@@ -111,7 +111,7 @@
/**
* @return true if the broker is currently in slave mode
*/
- boolean isSlaveBroker();
+ boolean isSlave();
/**
* @return number of messages pending delivery
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Thu Jul 19 12:24:31 2007
@@ -74,7 +74,7 @@
public void add(MessageReference node) throws Exception{
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
- if(!isFull()&&!isSlaveBroker()){
+ if(!isFull()&&!isSlave()){
optimizePrefetch();
// if maximumPendingMessages is set we will only discard messages
which
// have not been dispatched (i.e. we allow the prefetch buffer to
be filled)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?view=diff&rev=557748&r1=557747&r2=557748
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
Thu Jul 19 12:24:31 2007
@@ -36,7 +36,7 @@
protected BrokerId[] brokerPath;
protected boolean brokerMasterConnector;
protected boolean manageable;
- protected boolean clientMaster;
+ protected boolean clientMaster=true;
protected transient Object transportContext;
public ConnectionInfo() {