Author: chirino
Date: Tue Nov 13 20:52:18 2012
New Revision: 1408953

URL: http://svn.apache.org/viewvc?rev=1408953&view=rev
Log:
Improve format of the amqp trace messages, implement better producer flow 
control.

Modified:
    
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java

Modified: 
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1408953&r1=1408952&r2=1408953&view=diff
==============================================================================
--- 
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
 (original)
+++ 
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
 Tue Nov 13 20:52:18 2012
@@ -94,12 +94,12 @@ class AmqpProtocolConverter {
         this.protonTransport.setProtocolTracer(new ProtocolTracer() {
             @Override
             public void receivedFrame(TransportFrame transportFrame) {
-                System.out.println(String.format("RECV: %05d | %s", 
transportFrame.getChannel(), transportFrame.getBody()));
+                System.out.println(String.format("%s | RECV: %s", 
amqpTransport.getRemoteAddress(), transportFrame.getBody()));
             }
 
             @Override
             public void sentFrame(TransportFrame transportFrame) {
-                System.out.println(String.format("SENT: %05d | %s", 
transportFrame.getChannel(), transportFrame.getBody()));
+                System.out.println(String.format("%s | SENT: %s", 
amqpTransport.getRemoteAddress(), transportFrame.getBody()));
             }
         });
 
@@ -474,7 +474,7 @@ class AmqpProtocolConverter {
         }
 
         @Override
-        protected void onMessage(Receiver receiver, final Delivery delivery, 
Buffer buffer) throws Exception {
+        protected void onMessage(final Receiver receiver, final Delivery 
delivery, Buffer buffer) throws Exception {
             EncodedMessage em = new 
EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, 
buffer.length);
             final ActiveMQMessage message = (ActiveMQMessage) 
getInboundTransformer().transform(em);
             current = null;
@@ -494,13 +494,11 @@ class AmqpProtocolConverter {
                 message.setTransactionId(new LocalTransactionId(connectionId, 
txid));
             }
 
-            ResponseHandler handler = null;
-            if( delivery.remotelySettled() ) {
-                delivery.settle();
-            } else {
-                handler = new ResponseHandler() {
-                    @Override
-                    public void onResponse(AmqpProtocolConverter converter, 
Response response) throws IOException {
+            message.onSend();
+            sendToActiveMQ(message, new ResponseHandler() {
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, 
Response response) throws IOException {
+                    if( !delivery.remotelySettled()  ) {
                         if( response.isException() ) {
                             ExceptionResponse er = (ExceptionResponse)response;
                             Rejected rejected = new Rejected();
@@ -509,14 +507,12 @@ class AmqpProtocolConverter {
                             rejected.setError(errors);
                             delivery.disposition(rejected);
                         }
-                        delivery.settle();
-                        pumpProtonToSocket();
                     }
-                };
-            }
-
-            message.onSend();
-            sendToActiveMQ(message, handler);
+                    receiver.flow(1);
+                    delivery.settle();
+                    pumpProtonToSocket();
+                }
+            });
         }
 
     }

Modified: 
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java?rev=1408953&r1=1408952&r2=1408953&view=diff
==============================================================================
--- 
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
 (original)
+++ 
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
 Tue Nov 13 20:52:18 2012
@@ -42,4 +42,7 @@ public interface AmqpTransport {
     public void stop() throws Exception;
 
     public String getTransformer();
+
+    public String getRemoteAddress();
+
 }


Reply via email to