Author: rgodfrey
Date: Tue Nov  4 15:52:59 2014
New Revision: 1636617

URL: http://svn.apache.org/r1636617
Log:
QPID-6207 : [Java Broker] Flow uncommitted messages to disk if combined size 
greater than threshold

Modified:
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
    
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
 Tue Nov  4 15:52:59 2014
@@ -22,15 +22,14 @@ package org.apache.qpid.server.logging.m
 
 import static 
org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
 
-import java.text.MessageFormat;
-import java.util.Locale;
-import java.util.ResourceBundle;
-
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.logging.LogMessage;
 
+import java.text.MessageFormat;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
 /**
  * DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED.
  *
@@ -51,6 +50,7 @@ public class ChannelMessages
     public static final String CLOSE_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.close";
     public static final String PREFETCH_SIZE_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.prefetch_size";
     public static final String CLOSE_FORCED_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.close_forced";
+    public static final String LARGE_TRANSACTION_WARN_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.large_transaction_warn";
     public static final String DEADLETTERMSG_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.deadlettermsg";
     public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch";
     public static final String IDLE_TXN_LOG_HIERARCHY = 
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn";
@@ -68,6 +68,7 @@ public class ChannelMessages
         Logger.getLogger(CLOSE_LOG_HIERARCHY);
         Logger.getLogger(PREFETCH_SIZE_LOG_HIERARCHY);
         Logger.getLogger(CLOSE_FORCED_LOG_HIERARCHY);
+        Logger.getLogger(LARGE_TRANSACTION_WARN_LOG_HIERARCHY);
         Logger.getLogger(DEADLETTERMSG_LOG_HIERARCHY);
         Logger.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY);
         Logger.getLogger(IDLE_TXN_LOG_HIERARCHY);
@@ -263,6 +264,38 @@ public class ChannelMessages
 
     /**
      * Log a Channel message of the Format:
+     * <pre>CHN-1013 : Uncommitted transaction contains {0,number} bytes of 
incoming message data.</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage LARGE_TRANSACTION_WARN(Number param1)
+    {
+        String rawMessage = _messages.getString("LARGE_TRANSACTION_WARN");
+
+        final Object[] messageArguments = {param1};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, 
_currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return LARGE_TRANSACTION_WARN_LOG_HIERARCHY;
+            }
+        };
+    }
+
+    /**
+     * Log a Channel message of the Format:
      * <pre>CHN-1011 : Message : {0,number} moved to dead letter queue : 
{1}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
 Tue Nov  4 15:52:59 2014
@@ -40,3 +40,6 @@ DISCARDMSG_NOROUTE = CHN-1010 : Discarde
 DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue : 
{1}
 
 FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed.
+
+LARGE_TRANSACTION_WARN = CHN-1013 : Uncommitted transaction contains 
{0,number} bytes of incoming message data.
+

Modified: 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
 Tue Nov  4 15:52:59 2014
@@ -44,6 +44,11 @@ public interface Connection<X extends Co
     String PORT = "port";
 
 
+    String MAX_UNCOMMITTED_IN_MEMORY_SIZE = 
"connection.maxUncommittedInMemorySize";
+
+    @ManagedContextDefault(name = MAX_UNCOMMITTED_IN_MEMORY_SIZE)
+    long DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE = 10l * 1024l * 1024l;
+
     @DerivedAttribute
     String getClientId();
 

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
 Tue Nov  4 15:52:59 2014
@@ -27,6 +27,7 @@ import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -74,6 +75,7 @@ import org.apache.qpid.server.security.A
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -135,7 +137,6 @@ public class ServerSession extends Sessi
     private long _blockTime;
     private long _blockingTimeout;
 
-
     public static interface MessageDispositionChangeListener
     {
         public void onAccept();
@@ -168,6 +169,11 @@ public class ServerSession extends Sessi
 
     private AtomicReference<LogMessage> _forcedCloseLogMessage = new 
AtomicReference<LogMessage>();
 
+    private volatile long _uncommittedMessageSize;
+    private final List<StoredMessage<MessageMetaData_0_10>> 
_uncommittedMessages = new ArrayList<>();
+    private long _maxUncommittedInMemorySize;
+
+
     public ServerSession(Connection connection, SessionDelegate delegate, 
Binary name, long expiry)
     {
         super(connection, delegate, name, expiry);
@@ -188,6 +194,8 @@ public class ServerSession extends Sessi
 
         _blockingTimeout = 
((ServerConnection)connection).getBroker().getContextValue(Long.class,
                                                                   
Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
+        _maxUncommittedInMemorySize = 
getVirtualHost().getContextValue(Long.class, 
org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+
     }
 
     protected void setState(final State state)
@@ -258,9 +266,46 @@ public class ServerSession extends Sessi
                                     );
         getConnectionModel().registerMessageReceived(message.getSize(), 
message.getArrivalTime());
         incrementOutstandingTxnsIfNecessary();
+        incrementUncommittedMessageSize(message.getStoredMessage());
         return enqueues;
     }
 
+    private void resetUncommittedMessages()
+    {
+        _uncommittedMessageSize = 0l;
+        _uncommittedMessages.clear();
+    }
+
+    private void incrementUncommittedMessageSize(final 
StoredMessage<MessageMetaData_0_10> handle)
+    {
+        if (isTransactional() && !(_transaction instanceof 
DistributedTransaction))
+        {
+            _uncommittedMessageSize += handle.getMetaData().getContentSize();
+            if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
+            {
+                handle.flowToDisk();
+                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize 
== handle.getMetaData().getContentSize())
+                {
+                    getVirtualHost().getEventLogger()
+                            .message(_logSubject, 
ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+                }
+
+                if(!_uncommittedMessages.isEmpty())
+                {
+                    for (StoredMessage<MessageMetaData_0_10> uncommittedHandle 
: _uncommittedMessages)
+                    {
+                        uncommittedHandle.flowToDisk();
+                    }
+                    _uncommittedMessages.clear();
+                }
+            }
+            else
+            {
+                _uncommittedMessages.add(handle);
+            }
+        }
+    }
+
 
     public void sendMessage(MessageTransfer xfr,
                             Runnable postIdSettingAction)
@@ -620,6 +665,7 @@ public class ServerSession extends Sessi
         _txnCommits.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
+        resetUncommittedMessages();
     }
 
     public void rollback()
@@ -629,6 +675,7 @@ public class ServerSession extends Sessi
         _txnRejects.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
+        resetUncommittedMessages();
     }
 
 
@@ -707,7 +754,7 @@ public class ServerSession extends Sessi
         return getVirtualHost().getMessageStore();
     }
 
-    public VirtualHostImpl getVirtualHost()
+    public VirtualHostImpl<?,?,?> getVirtualHost()
     {
         return getConnection().getVirtualHost();
     }
@@ -1082,6 +1129,12 @@ public class ServerSession extends Sessi
         }
     }
 
+
+    public final long getMaxUncommittedInMemorySize()
+    {
+        return _maxUncommittedInMemorySize;
+    }
+
     @Override
     public int compareTo(AMQSessionModel o)
     {

Modified: 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Tue Nov  4 15:52:59 2014
@@ -82,6 +82,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfigurationChangeListener;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -206,6 +207,9 @@ public class AMQChannel
     private long _blockingTimeout;
     private boolean _confirmOnPublish;
     private long _confirmedMessageCounter;
+    private volatile long _uncommittedMessageSize;
+    private final List<StoredMessage<MessageMetaData>> _uncommittedMessages = 
new ArrayList<>();
+    private long _maxUncommittedInMemorySize;
 
     public AMQChannel(AMQProtocolEngine connection, int channelId, final 
MessageStore messageStore)
     {
@@ -216,6 +220,7 @@ public class AMQChannel
                                
connection.getAuthorizedSubject().getPublicCredentials(),
                                
connection.getAuthorizedSubject().getPrivateCredentials());
         _subject.getPrincipals().add(new SessionPrincipal(this));
+        _maxUncommittedInMemorySize = 
connection.getVirtualHost().getContextValue(Long.class, 
Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
         _logSubject = new ChannelLogSubject(this);
 
         _messageStore = messageStore;
@@ -481,6 +486,7 @@ public class AMQChannel
                                         
.createBasicAckBody(_confirmedMessageCounter, false);
                                 
_connection.writeFrame(responseBody.generateFrame(_channelId));
                             }
+                            incrementUncommittedMessageSize(handle);
                             incrementOutstandingTxnsIfNecessary();
                         }
                     }
@@ -506,6 +512,36 @@ public class AMQChannel
 
     }
 
+    private void incrementUncommittedMessageSize(final 
StoredMessage<MessageMetaData> handle)
+    {
+        if (isTransactional())
+        {
+            _uncommittedMessageSize += handle.getMetaData().getContentSize();
+            if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
+            {
+                handle.flowToDisk();
+                if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize 
== handle.getMetaData().getContentSize())
+                {
+                    getVirtualHost().getEventLogger()
+                            .message(_logSubject, 
ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+                }
+
+                if(!_uncommittedMessages.isEmpty())
+                {
+                    for (StoredMessage<MessageMetaData> uncommittedHandle : 
_uncommittedMessages)
+                    {
+                        uncommittedHandle.flowToDisk();
+                    }
+                    _uncommittedMessages.clear();
+                }
+            }
+            else
+            {
+                _uncommittedMessages.add(handle);
+            }
+        }
+    }
+
     /**
      * Either throws a {@link AMQConnectionException} or returns the message
      *
@@ -1182,6 +1218,13 @@ public class AMQChannel
             _txnStarts.incrementAndGet();
             decrementOutstandingTxnsIfNecessary();
         }
+        resetUncommittedMessages();
+    }
+
+    private void resetUncommittedMessages()
+    {
+        _uncommittedMessageSize = 0l;
+        _uncommittedMessages.clear();
     }
 
     public void rollback(Runnable postRollbackTask)
@@ -1209,6 +1252,7 @@ public class AMQChannel
             _txnRejects.incrementAndGet();
             _txnStarts.incrementAndGet();
             decrementOutstandingTxnsIfNecessary();
+            resetUncommittedMessages();
         }
 
         postRollbackTask.run();
@@ -1368,6 +1412,11 @@ public class AMQChannel
         return _currentMessage != null;
     }
 
+    public long getMaxUncommittedInMemorySize()
+    {
+        return _maxUncommittedInMemorySize;
+    }
+
     private class GetDeliveryMethod implements ClientDeliveryMethod
     {
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to