Author: chirino
Date: Tue Nov 13 20:52:18 2012
New Revision: 1408953
URL: http://svn.apache.org/viewvc?rev=1408953&view=rev
Log:
Improve format of the amqp trace messages, implement better producer flow
control.
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1408953&r1=1408952&r2=1408953&view=diff
==============================================================================
---
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
(original)
+++
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
Tue Nov 13 20:52:18 2012
@@ -94,12 +94,12 @@ class AmqpProtocolConverter {
this.protonTransport.setProtocolTracer(new ProtocolTracer() {
@Override
public void receivedFrame(TransportFrame transportFrame) {
- System.out.println(String.format("RECV: %05d | %s",
transportFrame.getChannel(), transportFrame.getBody()));
+ System.out.println(String.format("%s | RECV: %s",
amqpTransport.getRemoteAddress(), transportFrame.getBody()));
}
@Override
public void sentFrame(TransportFrame transportFrame) {
- System.out.println(String.format("SENT: %05d | %s",
transportFrame.getChannel(), transportFrame.getBody()));
+ System.out.println(String.format("%s | SENT: %s",
amqpTransport.getRemoteAddress(), transportFrame.getBody()));
}
});
@@ -474,7 +474,7 @@ class AmqpProtocolConverter {
}
@Override
- protected void onMessage(Receiver receiver, final Delivery delivery,
Buffer buffer) throws Exception {
+ protected void onMessage(final Receiver receiver, final Delivery
delivery, Buffer buffer) throws Exception {
EncodedMessage em = new
EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset,
buffer.length);
final ActiveMQMessage message = (ActiveMQMessage)
getInboundTransformer().transform(em);
current = null;
@@ -494,13 +494,11 @@ class AmqpProtocolConverter {
message.setTransactionId(new LocalTransactionId(connectionId,
txid));
}
- ResponseHandler handler = null;
- if( delivery.remotelySettled() ) {
- delivery.settle();
- } else {
- handler = new ResponseHandler() {
- @Override
- public void onResponse(AmqpProtocolConverter converter,
Response response) throws IOException {
+ message.onSend();
+ sendToActiveMQ(message, new ResponseHandler() {
+ @Override
+ public void onResponse(AmqpProtocolConverter converter,
Response response) throws IOException {
+ if( !delivery.remotelySettled() ) {
if( response.isException() ) {
ExceptionResponse er = (ExceptionResponse)response;
Rejected rejected = new Rejected();
@@ -509,14 +507,12 @@ class AmqpProtocolConverter {
rejected.setError(errors);
delivery.disposition(rejected);
}
- delivery.settle();
- pumpProtonToSocket();
}
- };
- }
-
- message.onSend();
- sendToActiveMQ(message, handler);
+ receiver.flow(1);
+ delivery.settle();
+ pumpProtonToSocket();
+ }
+ });
}
}
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1408953&r1=1408952&r2=1408953&view=diff
==============================================================================
---
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
(original)
+++
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
Tue Nov 13 20:52:18 2012
@@ -42,4 +42,7 @@ public interface AmqpTransport {
public void stop() throws Exception;
public String getTransformer();
+
+ public String getRemoteAddress();
+
}