Author: philharveyonline
Date: Thu Dec 20 09:48:35 2012
New Revision: 1424382

URL: http://svn.apache.org/viewvc?rev=1424382&view=rev
Log:
QPID-4515: improved broker logging, particularly when receiving/sending AMQP 
0-8/0-9 frames and when committing transactions.

Work done by Keith (kwall) and myself.

Modified:
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1424382&r1=1424381&r2=1424382&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
 Thu Dec 20 09:48:35 2012
@@ -902,7 +902,7 @@ public abstract class AbstractBDBMessage
             {
                 LOGGER.debug("Enqueuing message " + messageId + " on queue "
                         + (queue instanceof AMQQueue ? ((AMQQueue) 
queue).getName() + " with id " : "") + queue.getId()
-                        + " [Transaction" + tx + "]");
+                        + " in transaction " + tx);
             }
             _deliveryDb.put(tx, key, value);
         }
@@ -1056,7 +1056,8 @@ public abstract class AbstractBDBMessage
 
             if (LOGGER.isDebugEnabled())
             {
-                LOGGER.debug("commitTranImpl completed for [Transaction:" + tx 
+ "]");
+                String transactionType = syncCommit ? "synchronous" : 
"asynchronous";
+                LOGGER.debug("commitTranImpl completed " + transactionType + " 
transaction " + tx);
             }
         }
         catch (DatabaseException e)
@@ -1078,7 +1079,7 @@ public abstract class AbstractBDBMessage
     {
         if (LOGGER.isDebugEnabled())
         {
-            LOGGER.debug("abortTran called for [Transaction:" + tx + "]");
+            LOGGER.debug("abortTran called for transaction " + tx);
         }
 
         try
@@ -1190,7 +1191,7 @@ public abstract class AbstractBDBMessage
 
             if (LOGGER.isDebugEnabled())
             {
-                LOGGER.debug("Storing content for message " + messageId + 
"[Transaction" + tx + "]");
+                LOGGER.debug("Storing content for message " + messageId + " in 
transaction " + tx);
 
             }
         }
@@ -1215,8 +1216,9 @@ public abstract class AbstractBDBMessage
     {
         if (LOGGER.isDebugEnabled())
         {
-            LOGGER.debug("public void storeMetaData(Txn tx = " + tx + ", Long 
messageId = "
-                       + messageId + ", MessageMetaData messageMetaData = " + 
messageMetaData + "): called");
+            LOGGER.debug("storeMetaData called for transaction " + tx
+                    + ", messageId " + messageId
+                    + ", messageMetaData " + messageMetaData);
         }
 
         DatabaseEntry key = new DatabaseEntry();
@@ -1230,7 +1232,7 @@ public abstract class AbstractBDBMessage
             _messageMetaDataDb.put(tx, key, value);
             if (LOGGER.isDebugEnabled())
             {
-                LOGGER.debug("Storing message metadata for message id " + 
messageId + "[Transaction" + tx + "]");
+                LOGGER.debug("Storing message metadata for message id " + 
messageId + " in transaction " + tx);
             }
         }
         catch (DatabaseException e)

Modified: 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java?rev=1424382&r1=1424381&r2=1424382&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
 (original)
+++ 
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
 Thu Dec 20 09:48:35 2012
@@ -80,7 +80,7 @@ public class CommitThreadWrapper
         {
             if (LOGGER.isDebugEnabled())
             {
-                LOGGER.debug("public synchronized void complete(): called 
(Transaction = " + _tx + ")");
+                LOGGER.debug("complete() called for transaction " + _tx);
             }
             _complete = true;
 
@@ -101,7 +101,10 @@ public class CommitThreadWrapper
 
             if(!_syncCommit)
             {
-                LOGGER.debug("CommitAsync was requested, returning 
immediately.");
+                if(LOGGER.isDebugEnabled())
+                {
+                    LOGGER.debug("CommitAsync was requested, returning 
immediately.");
+                }
                 return;
             }
 
@@ -121,6 +124,12 @@ public class CommitThreadWrapper
 
         public synchronized void waitForCompletion()
         {
+            long startTime = 0;
+            if(LOGGER.isDebugEnabled())
+            {
+                startTime = System.currentTimeMillis();
+            }
+
             while (!isComplete())
             {
                 _commitThread.explicitNotify();
@@ -133,6 +142,12 @@ public class CommitThreadWrapper
                     throw new RuntimeException(e);
                 }
             }
+
+            if(LOGGER.isDebugEnabled())
+            {
+                long duration = System.currentTimeMillis() - startTime;
+                LOGGER.debug("waitForCompletion returning after " + duration + 
" ms for transaction " + _tx);
+            }
         }
     }
 
@@ -198,8 +213,20 @@ public class CommitThreadWrapper
 
             try
             {
+                long startTime = 0;
+                if(LOGGER.isDebugEnabled())
+                {
+                    startTime = System.currentTimeMillis();
+                }
+
                 _environment.flushLog(true);
 
+                if(LOGGER.isDebugEnabled())
+                {
+                    long duration = System.currentTimeMillis() - startTime;
+                    LOGGER.debug("flushLog completed in " + duration  + " ms");
+                }
+
                 for(int i = 0; i < size; i++)
                 {
                     BDBCommitFuture commit = _jobQueue.poll();

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1424382&r1=1424381&r2=1424382&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Thu Dec 20 09:48:35 2012
@@ -378,7 +378,7 @@ public class AMQChannel implements AMQSe
 
         if (_logger.isDebugEnabled())
         {
-            _logger.debug(debugIdentity() + "Content body received on channel 
" + _channelId);
+            _logger.debug(debugIdentity() + " content body received on channel 
" + _channelId);
         }
 
         try
@@ -1583,6 +1583,11 @@ public class AMQChannel implements AMQSe
 
     public void sync()
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("sync() called on channel " + debugIdentity());
+        }
+
         AsyncCommand cmd;
         while((cmd = _unfinishedCommandsQueue.poll()) != null)
         {

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java?rev=1424382&r1=1424381&r2=1424382&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterImpl.java
 Thu Dec 20 09:48:35 2012
@@ -218,55 +218,71 @@ class ProtocolOutputConverterImpl implem
 
         final boolean isRedelivered = entry.isRedelivered();
 
-        final AMQBody returnBlock = new AMQBody()
-        {
-
-            private AMQBody _underlyingBody;
-
-            public AMQBody createAMQBody()
-            {
-                return _methodRegistry.createBasicDeliverBody(consumerTag,
-                                                              deliveryTag,
-                                                              isRedelivered,
-                                                              exchangeName,
-                                                              routingKey);
-
+        final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, 
routingKey, exchangeName, consumerTag, isRedelivered);
+        return returnBlock;
+    }
 
+    private class EncodedDeliveryBody implements AMQBody
+    {
+        private final long _deliveryTag;
+        private final AMQShortString _routingKey;
+        private final AMQShortString _exchangeName;
+        private final AMQShortString _consumerTag;
+        private final boolean _isRedelivered;
+        private AMQBody _underlyingBody;
 
+        private EncodedDeliveryBody(long deliveryTag, AMQShortString 
routingKey, AMQShortString exchangeName, AMQShortString consumerTag, boolean 
isRedelivered)
+        {
+            _deliveryTag = deliveryTag;
+            _routingKey = routingKey;
+            _exchangeName = exchangeName;
+            _consumerTag = consumerTag;
+            _isRedelivered = isRedelivered;
+        }
 
+        public AMQBody createAMQBody()
+        {
+            return _methodRegistry.createBasicDeliverBody(_consumerTag,
+                                                          _deliveryTag,
+                                                          _isRedelivered,
+                                                          _exchangeName,
+                                                          _routingKey);
+        }
 
-            }
+        public byte getFrameType()
+        {
+            return AMQMethodBody.TYPE;
+        }
 
-            public byte getFrameType()
+        public int getSize()
+        {
+            if(_underlyingBody == null)
             {
-                return AMQMethodBody.TYPE;
+                _underlyingBody = createAMQBody();
             }
+            return _underlyingBody.getSize();
+        }
 
-            public int getSize()
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            if(_underlyingBody == null)
             {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                return _underlyingBody.getSize();
+                _underlyingBody = createAMQBody();
             }
+            _underlyingBody.writePayload(buffer);
+        }
 
-            public void writePayload(DataOutput buffer) throws IOException
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                _underlyingBody.writePayload(buffer);
-            }
+        public void handle(final int channelId, final 
AMQVersionAwareProtocolSession amqMinaProtocolSession)
+            throws AMQException
+        {
+            throw new AMQException("This block should never be dispatched!");
+        }
 
-            public void handle(final int channelId, final 
AMQVersionAwareProtocolSession amqMinaProtocolSession)
-                throws AMQException
-            {
-                throw new AMQException("This block should never be 
dispatched!");
-            }
-        };
-        return returnBlock;
+        @Override
+        public String toString()
+        {
+            return "[" + getClass().getSimpleName() + " underlyingBody: " + 
String.valueOf(_underlyingBody) + "]";
+        }
     }
 
     private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, 
int queueSize)
@@ -368,7 +384,6 @@ class ProtocolOutputConverterImpl implem
             _methodBody = methodBody;
             _headerBody = headerBody;
             _contentBody = contentBody;
-
         }
 
         public long getSize()
@@ -380,6 +395,19 @@ class ProtocolOutputConverterImpl implem
         {
             AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, 
_contentBody);
         }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder builder = new StringBuilder();
+            builder.append("[").append(getClass().getSimpleName())
+                .append(" methodBody=").append(_methodBody)
+                .append(", headerBody=").append(_headerBody)
+                .append(", contentBody=").append(_contentBody)
+                .append(", channel=").append(_channel).append("]");
+            return builder.toString();
+        }
+
     }
 
     public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
@@ -408,6 +436,17 @@ class ProtocolOutputConverterImpl implem
         {
             AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
         }
+
+        @Override
+        public String toString()
+        {
+            StringBuilder builder = new StringBuilder();
+            builder.append(getClass().getSimpleName())
+                .append("methodBody=").append(_methodBody)
+                .append(", headerBody=").append(_headerBody)
+                .append(", channel=").append(_channel).append("]");
+            return builder.toString();
+        }
     }
 
 }
\ No newline at end of file

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1424382&r1=1424381&r2=1424382&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
 Thu Dec 20 09:48:35 2012
@@ -303,9 +303,13 @@ public class AMQProtocolEngine implement
 
         try
         {
+            long startTime = 0;
+            String frameToString = null;
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Frame Received: " + frame);
+                startTime = System.currentTimeMillis();
+                frameToString = frame.toString();
+                _logger.debug("RECV: " + frame);
             }
 
             // Check that this channel is not closing
@@ -340,6 +344,11 @@ public class AMQProtocolEngine implement
                 closeChannel(channelId);
                 throw e;
             }
+
+            if(_logger.isDebugEnabled())
+            {
+                _logger.debug("Frame handled in " + 
(System.currentTimeMillis() - startTime) + " ms. Frame: " + frameToString);
+            }
         }
         finally
         {
@@ -543,6 +552,12 @@ public class AMQProtocolEngine implement
 
         final ByteBuffer buf = asByteBuffer(frame);
         _writtenBytes += buf.remaining();
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("SEND: " + frame);
+        }
+
         _sender.send(buf);
         final long time = System.currentTimeMillis();
         _lastIoTime = time;

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1424382&r1=1424381&r2=1424382&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
 Thu Dec 20 09:48:35 2012
@@ -384,10 +384,20 @@ public class LocalTransaction implements
 
     private void doPostTransactionActions()
     {
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Beginning " + _postTransactionActions.size() + " 
post transaction actions");
+        }
+
         for(int i = 0; i < _postTransactionActions.size(); i++)
         {
             _postTransactionActions.get(i).postCommit();
         }
+
+        if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Completed post transaction actions");
+        }
     }
 
     public void rollback()

Modified: 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=1424382&r1=1424381&r2=1424382&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 (original)
+++ 
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 Thu Dec 20 09:48:35 2012
@@ -25,7 +25,6 @@ import org.apache.qpid.configuration.Cli
 import org.apache.qpid.transport.network.Frame;
 import org.apache.qpid.transport.util.Logger;
 import org.apache.qpid.transport.util.Waiter;
-
 import static org.apache.qpid.transport.Option.COMPLETED;
 import static org.apache.qpid.transport.Option.SYNC;
 import static org.apache.qpid.transport.Option.TIMELY_REPLY;
@@ -414,7 +413,7 @@ public class Session extends SessionInvo
 
         if(log.isDebugEnabled())
         {
-            log.debug("ID: [%s] %s", this.channel, id);
+            log.debug("identify: ch=%s, commandId=%s", this.channel, id);
         }
 
         if ((id & 0xff) == 0)
@@ -443,7 +442,7 @@ public class Session extends SessionInvo
     {
         if(log.isDebugEnabled())
         {
-            log.debug("%s processed([%d,%d]) %s %s", this, lower, upper, 
syncPoint, maxProcessed);
+            log.debug("%s ch=%s processed([%d,%d]) %s %s", this, channel, 
lower, upper, syncPoint, maxProcessed);
         }
 
         boolean flush;
@@ -451,7 +450,7 @@ public class Session extends SessionInvo
         {
             if(log.isDebugEnabled())
             {
-                log.debug("%s", processed);
+                log.debug("%s processed: %s", this, processed);
             }
 
             if (ge(upper, commandsIn))



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to