Author: rgodfrey
Date: Tue Nov 4 15:52:59 2014
New Revision: 1636617
URL: http://svn.apache.org/r1636617
Log:
QPID-6207 : [Java Broker] Flow uncommitted messages to disk if combined size
greater than threshold
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/ChannelMessages.java
Tue Nov 4 15:52:59 2014
@@ -22,15 +22,14 @@ package org.apache.qpid.server.logging.m
import static
org.apache.qpid.server.logging.AbstractMessageLogger.DEFAULT_LOG_HIERARCHY_PREFIX;
-import java.text.MessageFormat;
-import java.util.Locale;
-import java.util.ResourceBundle;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.logging.LogMessage;
+import java.text.MessageFormat;
+import java.util.Locale;
+import java.util.ResourceBundle;
+
/**
* DO NOT EDIT DIRECTLY, THIS FILE WAS GENERATED.
*
@@ -51,6 +50,7 @@ public class ChannelMessages
public static final String CLOSE_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.close";
public static final String PREFETCH_SIZE_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.prefetch_size";
public static final String CLOSE_FORCED_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.close_forced";
+ public static final String LARGE_TRANSACTION_WARN_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.large_transaction_warn";
public static final String DEADLETTERMSG_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.deadlettermsg";
public static final String DISCARDMSG_NOALTEXCH_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.discardmsg_noaltexch";
public static final String IDLE_TXN_LOG_HIERARCHY =
DEFAULT_LOG_HIERARCHY_PREFIX + "channel.idle_txn";
@@ -68,6 +68,7 @@ public class ChannelMessages
Logger.getLogger(CLOSE_LOG_HIERARCHY);
Logger.getLogger(PREFETCH_SIZE_LOG_HIERARCHY);
Logger.getLogger(CLOSE_FORCED_LOG_HIERARCHY);
+ Logger.getLogger(LARGE_TRANSACTION_WARN_LOG_HIERARCHY);
Logger.getLogger(DEADLETTERMSG_LOG_HIERARCHY);
Logger.getLogger(DISCARDMSG_NOALTEXCH_LOG_HIERARCHY);
Logger.getLogger(IDLE_TXN_LOG_HIERARCHY);
@@ -263,6 +264,38 @@ public class ChannelMessages
/**
* Log a Channel message of the Format:
+ * <pre>CHN-1013 : Uncommitted transaction contains {0,number} bytes of
incoming message data.</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage LARGE_TRANSACTION_WARN(Number param1)
+ {
+ String rawMessage = _messages.getString("LARGE_TRANSACTION_WARN");
+
+ final Object[] messageArguments = {param1};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage,
_currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return LARGE_TRANSACTION_WARN_LOG_HIERARCHY;
+ }
+ };
+ }
+
+ /**
+ * Log a Channel message of the Format:
* <pre>CHN-1011 : Message : {0,number} moved to dead letter queue :
{1}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Channel_logmessages.properties
Tue Nov 4 15:52:59 2014
@@ -40,3 +40,6 @@ DISCARDMSG_NOROUTE = CHN-1010 : Discarde
DEADLETTERMSG = CHN-1011 : Message : {0,number} moved to dead letter queue :
{1}
FLOW_CONTROL_IGNORED = CHN-1012 : Flow Control Ignored. Channel will be closed.
+
+LARGE_TRANSACTION_WARN = CHN-1013 : Uncommitted transaction contains
{0,number} bytes of incoming message data.
+
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
(original)
+++
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
Tue Nov 4 15:52:59 2014
@@ -44,6 +44,11 @@ public interface Connection<X extends Co
String PORT = "port";
+ String MAX_UNCOMMITTED_IN_MEMORY_SIZE =
"connection.maxUncommittedInMemorySize";
+
+ @ManagedContextDefault(name = MAX_UNCOMMITTED_IN_MEMORY_SIZE)
+ long DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE = 10l * 1024l * 1024l;
+
@DerivedAttribute
String getClientId();
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
(original)
+++
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
Tue Nov 4 15:52:59 2014
@@ -27,6 +27,7 @@ import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -74,6 +75,7 @@ import org.apache.qpid.server.security.A
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -135,7 +137,6 @@ public class ServerSession extends Sessi
private long _blockTime;
private long _blockingTimeout;
-
public static interface MessageDispositionChangeListener
{
public void onAccept();
@@ -168,6 +169,11 @@ public class ServerSession extends Sessi
private AtomicReference<LogMessage> _forcedCloseLogMessage = new
AtomicReference<LogMessage>();
+ private volatile long _uncommittedMessageSize;
+ private final List<StoredMessage<MessageMetaData_0_10>>
_uncommittedMessages = new ArrayList<>();
+ private long _maxUncommittedInMemorySize;
+
+
public ServerSession(Connection connection, SessionDelegate delegate,
Binary name, long expiry)
{
super(connection, delegate, name, expiry);
@@ -188,6 +194,8 @@ public class ServerSession extends Sessi
_blockingTimeout =
((ServerConnection)connection).getBroker().getContextValue(Long.class,
Broker.CHANNEL_FLOW_CONTROL_ENFORCEMENT_TIMEOUT);
+ _maxUncommittedInMemorySize =
getVirtualHost().getContextValue(Long.class,
org.apache.qpid.server.model.Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+
}
protected void setState(final State state)
@@ -258,9 +266,46 @@ public class ServerSession extends Sessi
);
getConnectionModel().registerMessageReceived(message.getSize(),
message.getArrivalTime());
incrementOutstandingTxnsIfNecessary();
+ incrementUncommittedMessageSize(message.getStoredMessage());
return enqueues;
}
+ private void resetUncommittedMessages()
+ {
+ _uncommittedMessageSize = 0l;
+ _uncommittedMessages.clear();
+ }
+
+ private void incrementUncommittedMessageSize(final
StoredMessage<MessageMetaData_0_10> handle)
+ {
+ if (isTransactional() && !(_transaction instanceof
DistributedTransaction))
+ {
+ _uncommittedMessageSize += handle.getMetaData().getContentSize();
+ if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
+ {
+ handle.flowToDisk();
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize
== handle.getMetaData().getContentSize())
+ {
+ getVirtualHost().getEventLogger()
+ .message(_logSubject,
ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ }
+
+ if(!_uncommittedMessages.isEmpty())
+ {
+ for (StoredMessage<MessageMetaData_0_10> uncommittedHandle
: _uncommittedMessages)
+ {
+ uncommittedHandle.flowToDisk();
+ }
+ _uncommittedMessages.clear();
+ }
+ }
+ else
+ {
+ _uncommittedMessages.add(handle);
+ }
+ }
+ }
+
public void sendMessage(MessageTransfer xfr,
Runnable postIdSettingAction)
@@ -620,6 +665,7 @@ public class ServerSession extends Sessi
_txnCommits.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
public void rollback()
@@ -629,6 +675,7 @@ public class ServerSession extends Sessi
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
@@ -707,7 +754,7 @@ public class ServerSession extends Sessi
return getVirtualHost().getMessageStore();
}
- public VirtualHostImpl getVirtualHost()
+ public VirtualHostImpl<?,?,?> getVirtualHost()
{
return getConnection().getVirtualHost();
}
@@ -1082,6 +1129,12 @@ public class ServerSession extends Sessi
}
}
+
+ public final long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
+
@Override
public int compareTo(AMQSessionModel o)
{
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1636617&r1=1636616&r2=1636617&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Tue Nov 4 15:52:59 2014
@@ -82,6 +82,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.ExclusivityPolicy;
@@ -206,6 +207,9 @@ public class AMQChannel
private long _blockingTimeout;
private boolean _confirmOnPublish;
private long _confirmedMessageCounter;
+ private volatile long _uncommittedMessageSize;
+ private final List<StoredMessage<MessageMetaData>> _uncommittedMessages =
new ArrayList<>();
+ private long _maxUncommittedInMemorySize;
public AMQChannel(AMQProtocolEngine connection, int channelId, final
MessageStore messageStore)
{
@@ -216,6 +220,7 @@ public class AMQChannel
connection.getAuthorizedSubject().getPublicCredentials(),
connection.getAuthorizedSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
+ _maxUncommittedInMemorySize =
connection.getVirtualHost().getContextValue(Long.class,
Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_logSubject = new ChannelLogSubject(this);
_messageStore = messageStore;
@@ -481,6 +486,7 @@ public class AMQChannel
.createBasicAckBody(_confirmedMessageCounter, false);
_connection.writeFrame(responseBody.generateFrame(_channelId));
}
+ incrementUncommittedMessageSize(handle);
incrementOutstandingTxnsIfNecessary();
}
}
@@ -506,6 +512,36 @@ public class AMQChannel
}
+ private void incrementUncommittedMessageSize(final
StoredMessage<MessageMetaData> handle)
+ {
+ if (isTransactional())
+ {
+ _uncommittedMessageSize += handle.getMetaData().getContentSize();
+ if (_uncommittedMessageSize > getMaxUncommittedInMemorySize())
+ {
+ handle.flowToDisk();
+ if(!_uncommittedMessages.isEmpty() || _uncommittedMessageSize
== handle.getMetaData().getContentSize())
+ {
+ getVirtualHost().getEventLogger()
+ .message(_logSubject,
ChannelMessages.LARGE_TRANSACTION_WARN(_uncommittedMessageSize));
+ }
+
+ if(!_uncommittedMessages.isEmpty())
+ {
+ for (StoredMessage<MessageMetaData> uncommittedHandle :
_uncommittedMessages)
+ {
+ uncommittedHandle.flowToDisk();
+ }
+ _uncommittedMessages.clear();
+ }
+ }
+ else
+ {
+ _uncommittedMessages.add(handle);
+ }
+ }
+ }
+
/**
* Either throws a {@link AMQConnectionException} or returns the message
*
@@ -1182,6 +1218,13 @@ public class AMQChannel
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
}
+ resetUncommittedMessages();
+ }
+
+ private void resetUncommittedMessages()
+ {
+ _uncommittedMessageSize = 0l;
+ _uncommittedMessages.clear();
}
public void rollback(Runnable postRollbackTask)
@@ -1209,6 +1252,7 @@ public class AMQChannel
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
+ resetUncommittedMessages();
}
postRollbackTask.run();
@@ -1368,6 +1412,11 @@ public class AMQChannel
return _currentMessage != null;
}
+ public long getMaxUncommittedInMemorySize()
+ {
+ return _maxUncommittedInMemorySize;
+ }
+
private class GetDeliveryMethod implements ClientDeliveryMethod
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]