Repository: activemq-artemis Updated Branches: refs/heads/master 62096f975 -> f282dff57
ARTEMIS-1110 Cleanup Transaction Coordinator buffer handling Resuse a single small buffer for all txn commands (declare / dischare) to avoid creating lots of small arrays and ByteBuffer wrappers for txn operations. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d2731fa0 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d2731fa0 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d2731fa0 Branch: refs/heads/master Commit: d2731fa0e1d66831ad95011ff00a76727bd7d5e8 Parents: 62096f9 Author: Timothy Bish <[email protected]> Authored: Tue Apr 11 11:16:26 2017 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Apr 11 11:33:58 2017 -0400 ---------------------------------------------------------------------- .../transaction/ProtonTransactionHandler.java | 53 +++++++++++++------- .../protocol/amqp/util/DeliveryUtil.java | 30 ----------- 2 files changed, 35 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d2731fa0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java index a3dae25..f817ed4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton.transaction; +import java.nio.ByteBuffer; + import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler; -import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -32,6 +33,7 @@ import org.apache.qpid.proton.amqp.transaction.Discharge; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; @@ -48,6 +50,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { final AMQPSessionCallback sessionSPI; final AMQPConnectionContext connection; + private final ByteBuffer DECODE_BUFFER = ByteBuffer.allocate(64); + public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) { this.sessionSPI = sessionSPI; this.connection = connection; @@ -65,7 +69,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { return; } - byte[] buffer; + ByteBuffer buffer; + MessageImpl msg; synchronized (connection.getLock()) { // Replenish coordinator receiver credit on exhaustion so sender can continue @@ -74,14 +79,22 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { receiver.flow(amqpCredit); } - buffer = new byte[delivery.available()]; - receiver.recv(buffer, 0, buffer.length); - receiver.advance(); - } + // Declare is generally 7 bytes and discharge is around 48 depending on the + // encoded size of the TXN ID. Decode buffer has a bit of extra space but if + // the incoming request is to big just use a scratch buffer. + if (delivery.available() > DECODE_BUFFER.capacity()) { + buffer = ByteBuffer.allocate(delivery.available()); + } else { + buffer = (ByteBuffer) DECODE_BUFFER.clear(); + } + // Update Buffer for the next incoming command. + buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity())); + receiver.advance(); - MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer); + msg = decodeMessage(buffer); + } Object action = ((AmqpValue) msg.getBody()).getValue(); @@ -133,26 +146,30 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { } } - private Rejected createRejected(Symbol amqpError, String message) { - Rejected rejected = new Rejected(); - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(amqpError); - condition.setDescription(message); - rejected.setError(condition); - return rejected; - } - @Override public void onFlow(int credits, boolean drain) { } @Override public void close(boolean linkRemoteClose) throws ActiveMQAMQPException { - // no op } @Override public void close(ErrorCondition condition) throws ActiveMQAMQPException { - // no op + } + + private Rejected createRejected(Symbol amqpError, String message) { + Rejected rejected = new Rejected(); + ErrorCondition condition = new ErrorCondition(); + condition.setCondition(amqpError); + condition.setDescription(message); + rejected.setError(condition); + return rejected; + } + + private MessageImpl decodeMessage(ByteBuffer encoded) { + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.decode(encoded); + return message; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d2731fa0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java deleted file mode 100644 index 4267b85..0000000 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.protocol.amqp.util; - -import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.impl.MessageImpl; - -public class DeliveryUtil { - - public static MessageImpl decodeMessageImpl(byte[] data) { - MessageImpl message = (MessageImpl) Message.Factory.create(); - message.decode(data, 0, data.length); - return message; - } - -}
