Author: chirino
Date: Thu Mar 12 21:50:42 2009
New Revision: 753030
URL: http://svn.apache.org/viewvc?rev=753030&view=rev
Log:
Better flow names to make debugging easier.
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/broker/openwire/OpenwireBrokerConnection.java
Thu Mar 12 21:50:42 2009
@@ -308,18 +308,16 @@
class ProducerContext {
- private final ProducerInfo info;
private IFlowController<MessageDelivery> controller;
private String name;
public ProducerContext(final ProducerInfo info) {
- this.info = info;
this.name = info.getProducerId().toString();
// Openwire only uses credit windows at the producer level for
// producers that request the feature.
if (info.getWindowSize() > 0) {
- Flow flow = new Flow(info.getProducerId().toString(), false);
+ final Flow flow = new Flow("broker-"+name+"-inbound", false);
WindowLimiter<MessageDelivery> limiter = new
WindowLimiter<MessageDelivery>(false, flow, info.getWindowSize(),
info.getWindowSize() / 2) {
@Override
protected void sendCredit(int credit) {
@@ -334,7 +332,7 @@
}
public String toString() {
- return name;
+ return flow.getFlowName();
}
}, flow, limiter, inboundMutex);
} else {
@@ -357,13 +355,13 @@
this.name = info.getConsumerId().toString();
selector = parseSelector(info);
- Flow flow = new Flow(name, false);
+ Flow flow = new Flow("broker-"+name+"-outbound", false);
limiter = new WindowLimiter<MessageDelivery>(true, flow,
info.getPrefetchSize(), info.getPrefetchSize() / 2) {
public int getElementSize(MessageDelivery m) {
return 1;
}
};
- queue = new SingleFlowRelay<MessageDelivery>(flow, name +
"-outbound", limiter);
+ queue = new SingleFlowRelay<MessageDelivery>(flow,
flow.getFlowName(), limiter);
queue.setDrain(new IFlowDrain<MessageDelivery>() {
public void drain(final MessageDelivery message,
ISourceController<MessageDelivery> controller) {
Message msg = message.asType(Message.class);
@@ -450,7 +448,7 @@
protected void initialize() {
// Setup the inbound processing..
- Flow flow = new Flow(name, false);
+ final Flow flow = new Flow("broker-"+name+"-inbound", false);
SizeLimiter<MessageDelivery> limiter = new
SizeLimiter<MessageDelivery>(inputWindowSize, inputResumeThreshold);
inboundController = new FlowController<MessageDelivery>(new
FlowControllableAdapter() {
public void flowElemAccepted(ISourceController<MessageDelivery>
controller, MessageDelivery elem) {
@@ -458,7 +456,7 @@
}
public String toString() {
- return name;
+ return flow.getFlowName();
}
}, flow, limiter, inboundMutex);
Modified:
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
(original)
+++
activemq/sandbox/activemq-flow/src/main/java/org/apache/activemq/flow/FlowController.java
Thu Mar 12 21:50:42 2009
@@ -325,8 +325,7 @@
setUnThrottleListener();
if (!blockedSources.contains(source)) {
- // System.out.println("BLOCKING : SINK[" + this + "], SOURCE[" +
- // source + "]");
+// System.out.println("BLOCKING : SINK[" + this + "], SOURCE[" +
source + "]");
blockedSources.add(source);
source.onFlowBlock(this);
}
@@ -391,9 +390,7 @@
String was = Thread.currentThread().getName();
try {
for (ISourceController<E> source : blockedSources) {
- // System.out.println("UNBLOCKING: SINK[" +
- // FlowController.this + "], SOURCE[" + source +
- // "]");
+// System.out.println("UNBLOCKING: SINK[" +
FlowController.this + "], SOURCE[" + source + "]");
source.onFlowResume(FlowController.this);
}
for (FlowUnblockListener<E> listener :
unblockListeners) {
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/OpenwireSupport.java
Thu Mar 12 21:50:42 2009
@@ -42,9 +42,14 @@
return info;
}
+
public static ConnectionInfo createConnectionInfo() throws Exception {
+ return createConnectionInfo("connection:"+ (++idGenerator));
+ }
+
+ public static ConnectionInfo createConnectionInfo(String name) throws
Exception {
ConnectionInfo info = new ConnectionInfo();
- info.setConnectionId(new ConnectionId("connection:" +
(++idGenerator)));
+ info.setConnectionId(new ConnectionId(name));
info.setClientId(info.getConnectionId().getValue());
return info;
}
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteConsumer.java
Thu Mar 12 21:50:42 2009
@@ -13,7 +13,6 @@
import org.apache.activemq.broker.Router;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
@@ -74,7 +73,7 @@
activemqDestination = new
ActiveMQTopic(destination.getName().toString());
}
- connectionInfo = createConnectionInfo();
+ connectionInfo = createConnectionInfo(name);
transport.oneway(connectionInfo);
sessionInfo = createSessionInfo(connectionInfo);
transport.oneway(sessionInfo);
@@ -87,7 +86,7 @@
protected void initialize() {
// Setup the input processing..
- Flow flow = new Flow(name, false);
+ final Flow flow = new Flow("client-"+name+"-inbound", false);
WindowLimiter<MessageDelivery> limiter = new
WindowLimiter<MessageDelivery>(false, flow, inputWindowSize,
inputResumeThreshold) {
protected void sendCredit(int credit) {
MessageAck ack = OpenwireSupport.createAck(consumerInfo,
lastMessage, credit, MessageAck.STANDARD_ACK_TYPE);
@@ -99,7 +98,7 @@
messageReceived(controller, elem);
}
public String toString() {
- return name;
+ return flow.getFlowName();
}
public IFlowSink<MessageDelivery> getFlowSink() {
return null;
Modified:
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
URL:
http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java?rev=753030&r1=753029&r2=753030&view=diff
==============================================================================
---
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
(original)
+++
activemq/sandbox/activemq-flow/src/test/java/org/apache/activemq/broker/openwire/RemoteProducer.java
Thu Mar 12 21:50:42 2009
@@ -88,7 +88,7 @@
activemqDestination = new
ActiveMQTopic(destination.getName().toString());
}
- connectionInfo = createConnectionInfo();
+ connectionInfo = createConnectionInfo(name);
transport.oneway(connectionInfo);
sessionInfo = createSessionInfo(connectionInfo);
transport.oneway(sessionInfo);
@@ -101,10 +101,10 @@
}
protected void initialize() {
- Flow flow = new Flow(name, false);
+ Flow flow = new Flow("client-"+name+"-outbound", false);
outputResumeThreshold = outputWindowSize/2;
outboundLimiter = new WindowLimiter<MessageDelivery>(true, flow,
outputWindowSize, outputResumeThreshold);
- outboundQueue = new SingleFlowRelay<MessageDelivery>(flow, name +
"-outbound", outboundLimiter);
+ outboundQueue = new SingleFlowRelay<MessageDelivery>(flow,
flow.getFlowName(), outboundLimiter);
outboundController = outboundQueue.getFlowController(flow);
outboundQueue.setDrain(new IFlowDrain<MessageDelivery>() {