Author: rgodfrey
Date: Tue Mar 20 22:59:56 2012
New Revision: 1303192
URL: http://svn.apache.org/viewvc?rev=1303192&view=rev
Log:
NO-JIRA : [Java Config] fix unique id generation for consumers in adaption layer
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1303192&r1=1303191&r2=1303192&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
Tue Mar 20 22:59:56 2012
@@ -36,7 +36,12 @@ public class ConsumerAdapter extends Abs
public ConsumerAdapter(final QueueAdapter queueAdapter, final Subscription
subscription)
{
- super(queueAdapter.getVirtualHost().getName(), queueAdapter.getName(),
subscription.getConsumerTag().asString() );
+ super(queueAdapter.getVirtualHost().getName(),
+ queueAdapter.getName(),
+
subscription.getSession().getConnectionModel().getRemoteAddressString(),
+ String.valueOf(subscription.getSession().getChannelId()),
+ subscription.getConsumerTag().asString() );
+
_subscription = subscription;
_queue = queueAdapter;
//TODO
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1303192&r1=1303191&r2=1303192&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
Tue Mar 20 22:59:56 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.server.subscript
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -45,6 +46,7 @@ public interface Subscription
}
AMQQueue getQueue();
+ AMQSessionModel getSession();
QueueEntry.SubscriptionAcquiredState getOwningState();
QueueEntry.SubscriptionAssignedState getAssignedState();
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1303192&r1=1303191&r2=1303192&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
Tue Mar 20 22:59:56 2012
@@ -44,6 +44,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -371,11 +372,16 @@ public abstract class SubscriptionImpl i
}
+ public AMQSessionModel getSession()
+ {
+ return _channel;
+ }
+
public ConfigStore getConfigStore()
{
return getQueue().getConfigStore();
}
-
+
public Long getDelivered()
{
return _deliveredCount.get();
@@ -810,7 +816,7 @@ public abstract class SubscriptionImpl i
{
return _channel.isTransactional();
}
-
+
public long getCreateTime()
{
return _createTime;
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1303192&r1=1303191&r2=1303192&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
Tue Mar 20 22:59:56 2012
@@ -109,7 +109,7 @@ public class Subscription_0_10 implement
public void
stateChange(Subscription sub, State oldState, State newState)
{
-
CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
+
CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
}
};
private AMQQueue _queue;
@@ -199,7 +199,7 @@ public class Subscription_0_10 implement
CurrentActor.get().message(this,
SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
filterLogString.length() > 0));
}
-
+
}
public AMQShortString getConsumerTag()
@@ -302,7 +302,7 @@ public class Subscription_0_10 implement
{
return getQueue().getConfigStore();
}
-
+
public Long getDelivered()
{
return _deliveredCount.get();
@@ -664,7 +664,7 @@ public class Subscription_0_10 implement
private void forceDequeue(final QueueEntry entry, final boolean
restoreCredit)
{
- AutoCommitTransaction dequeueTxn = new
AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
+ AutoCommitTransaction dequeueTxn = new
AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
new ServerTransaction.Action()
{
@@ -960,7 +960,7 @@ public class Subscription_0_10 implement
return false;
}
- ServerSession getSession()
+ public ServerSession getSession()
{
return _session;
}
@@ -1033,7 +1033,7 @@ public class Subscription_0_10 implement
public String toLogString()
{
- String queueInfo = MessageFormat.format(QUEUE_FORMAT,
_queue.getVirtualHost().getName(),
+ String queueInfo = MessageFormat.format(QUEUE_FORMAT,
_queue.getVirtualHost().getName(),
_queue.getNameShortString());
String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT,
getSubscriptionID()) + "("
// queueString is "vh(/{0})/qu({1}) " so need to trim
Modified:
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL:
http://svn.apache.org/viewvc/qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1303192&r1=1303191&r2=1303192&view=diff
==============================================================================
---
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
(original)
+++
qpid/branches/java-config-and-management/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
Tue Mar 20 22:59:56 2012
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntry.SubscriptionAcquiredState;
@@ -126,6 +127,11 @@ public class MockSubscription implements
return queue;
}
+ public AMQSessionModel getSession()
+ {
+ return null;
+ }
+
public boolean trySendLock()
{
return _stateChangeLock.tryLock();
@@ -232,7 +238,6 @@ public class MockSubscription implements
messages.add(entry);
}
- @Override
public void flushBatched()
{
@@ -249,7 +254,7 @@ public class MockSubscription implements
}
public void setNoLocal(boolean noLocal)
- {
+ {
}
public void setStateListener(StateListener listener)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]