Repository: activemq-artemis
Updated Branches:
  refs/heads/master 861c23155 -> 224d78062


ARTEMIS-1045 Performance improvements on AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/291a4719
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/291a4719
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/291a4719

Branch: refs/heads/master
Commit: 291a4719b6b114b1452a272fd13393262f736a05
Parents: 861c231
Author: Clebert Suconic <[email protected]>
Authored: Fri Mar 17 11:14:18 2017 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Fri Mar 17 16:11:14 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPMessage.java       | 68 +++++++++++---------
 .../amqp/proton/ProtonServerSenderContext.java  | 13 ----
 2 files changed, 39 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/291a4719/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 60aae4c..653ee5f 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -72,6 +72,7 @@ public class AMQPMessage extends RefCountMessage {
    private DeliveryAnnotations _deliveryAnnotations;
    private MessageAnnotations _messageAnnotations;
    private Properties _properties;
+   private int appLocation = -1;
    private ApplicationProperties applicationProperties;
    private long scheduledTime = -1;
    private String connectionID;
@@ -93,7 +94,7 @@ public class AMQPMessage extends RefCountMessage {
 
    public AMQPMessage(long messageFormat, Message message) {
       this.messageFormat = messageFormat;
-      this.protonMessage = (MessageImpl)message;
+      this.protonMessage = (MessageImpl) message;
 
    }
 
@@ -124,7 +125,7 @@ public class AMQPMessage extends RefCountMessage {
             _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
             _properties = new Properties();
             this.applicationProperties = new ApplicationProperties(new 
HashMap<>());
-            this.protonMessage = (MessageImpl)Message.Factory.create();
+            this.protonMessage = (MessageImpl) Message.Factory.create();
             this.protonMessage.setApplicationProperties(applicationProperties);
             this.protonMessage.setDeliveryAnnotations(_deliveryAnnotations);
          }
@@ -148,6 +149,20 @@ public class AMQPMessage extends RefCountMessage {
 
    private ApplicationProperties getApplicationProperties() {
       parseHeaders();
+
+      if (applicationProperties == null && appLocation >= 0) {
+         ByteBuffer buffer = getBuffer().nioBuffer();
+         buffer.position(appLocation);
+         TLSEncode.getDecoder().setByteBuffer(buffer);
+         Object section = TLSEncode.getDecoder().readObject();
+         if (section instanceof ApplicationProperties) {
+            this.applicationProperties = (ApplicationProperties) section;
+         }
+         this.appLocation = -1;
+         TLSEncode.getDecoder().setByteBuffer(null);
+
+      }
+
       return applicationProperties;
    }
 
@@ -161,6 +176,7 @@ public class AMQPMessage extends RefCountMessage {
          parsedHeaders = true;
       }
    }
+
    @Override
    public org.apache.activemq.artemis.api.core.Message setConnectionID(String 
connectionID) {
       this.connectionID = connectionID;
@@ -172,7 +188,6 @@ public class AMQPMessage extends RefCountMessage {
       return connectionID;
    }
 
-
    public MessageAnnotations getMessageAnnotations() {
       parseHeaders();
       return _messageAnnotations;
@@ -202,7 +217,6 @@ public class AMQPMessage extends RefCountMessage {
       return null;
    }
 
-
    private void setSymbol(String symbol, Object value) {
       setSymbol(Symbol.getSymbol(symbol), value);
    }
@@ -231,11 +245,9 @@ public class AMQPMessage extends RefCountMessage {
             return null;
       } */
 
-
       return null;
    }
 
-
    @Override
    public SimpleString getGroupID() {
       parseHeaders();
@@ -247,7 +259,6 @@ public class AMQPMessage extends RefCountMessage {
       }
    }
 
-
    @Override
    public Long getScheduledDeliveryTime() {
 
@@ -339,15 +350,19 @@ public class AMQPMessage extends RefCountMessage {
                this.expiration = _properties.getAbsoluteExpiryTime().getTime();
             }
 
-            if (buffer.hasRemaining()) {
-               section = (Section) decoder.readObject();
-            } else {
-               section = null;
-            }
+            // We don't read the next section on purpose, as we will parse 
ApplicationProperties
+            // lazily
+            section = null;
          }
 
          if (section instanceof ApplicationProperties) {
             applicationProperties = (ApplicationProperties) section;
+         } else {
+            if (buffer.hasRemaining()) {
+               this.appLocation = buffer.position();
+            } else {
+               this.appLocation = -1;
+            }
          }
       } finally {
          decoder.setByteBuffer(null);
@@ -446,13 +461,11 @@ public class AMQPMessage extends RefCountMessage {
       }
    }
 
-
    @Override
    public Object getDuplicateProperty() {
       return null;
    }
 
-
    @Override
    public org.apache.activemq.artemis.api.core.Message setDurable(boolean 
durable) {
       return null;
@@ -463,7 +476,7 @@ public class AMQPMessage extends RefCountMessage {
       if (address == null) {
          Properties properties = getProtonMessage().getProperties();
          if (properties != null) {
-            return  properties.getTo();
+            return properties.getTo();
          } else {
             return null;
          }
@@ -539,7 +552,7 @@ public class AMQPMessage extends RefCountMessage {
             header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount - 
1));
             TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buffer));
             TLSEncode.getEncoder().writeObject(header);
-            TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
+            TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
          }
       }
       buffer.writeBytes(data, sendFrom, data.writerIndex() - sendFrom);
@@ -676,27 +689,27 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Boolean getBooleanProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return (Boolean)getApplicationPropertiesMap().get(key);
+      return (Boolean) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Byte getByteProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return (Byte)getApplicationPropertiesMap().get(key);
+      return (Byte) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Double getDoubleProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return (Double)getApplicationPropertiesMap().get(key);
+      return (Double) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Integer getIntProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return (Integer)getApplicationPropertiesMap().get(key);
+      return (Integer) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Long getLongProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return (Long)getApplicationPropertiesMap().get(key);
+      return (Long) getApplicationPropertiesMap().get(key);
    }
 
    @Override
@@ -712,12 +725,12 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Short getShortProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return (Short)getApplicationPropertiesMap().get(key);
+      return (Short) getApplicationPropertiesMap().get(key);
    }
 
    @Override
    public Float getFloatProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return (Float)getApplicationPropertiesMap().get(key);
+      return (Float) getApplicationPropertiesMap().get(key);
    }
 
    @Override
@@ -727,7 +740,7 @@ public class AMQPMessage extends RefCountMessage {
       } else if 
(key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
          return getConnectionID();
       } else {
-         return (String)getApplicationPropertiesMap().get(key);
+         return (String) getApplicationPropertiesMap().get(key);
       }
    }
 
@@ -747,7 +760,7 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public SimpleString getSimpleStringProperty(String key) throws 
ActiveMQPropertyConversionException {
-      return 
SimpleString.toSimpleString((String)getApplicationPropertiesMap().get(key));
+      return SimpleString.toSimpleString((String) 
getApplicationPropertiesMap().get(key));
    }
 
    @Override
@@ -842,8 +855,7 @@ public class AMQPMessage extends RefCountMessage {
    @Override
    public int getMemoryEstimate() {
       if (memoryEstimate == -1) {
-         memoryEstimate = memoryOffset +
-            (data != null ? data.capacity() : 0);
+         memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0);
       }
 
       return memoryEstimate;
@@ -858,7 +870,6 @@ public class AMQPMessage extends RefCountMessage {
       }
    }
 
-
    @Override
    public SimpleString getReplyTo() {
       if (getProperties() != null) {
@@ -877,7 +888,6 @@ public class AMQPMessage extends RefCountMessage {
       return this;
    }
 
-
    @Override
    public int getPersistSize() {
       checkBuffer();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/291a4719/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 962110e..0e0447f 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -42,7 +42,6 @@ import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFound
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import 
org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
-import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
@@ -89,7 +88,6 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
    private boolean multicast;
    //todo get this from somewhere
    private RoutingType defaultRoutingType = RoutingType.ANYCAST;
-   protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0);
    private RoutingType routingTypeToUse = defaultRoutingType;
    private boolean shared = false;
    private boolean global = false;
@@ -110,7 +108,6 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
 
    @Override
    public void onFlow(int currentCredits, boolean drain) {
-      this.creditsSemaphore.setCredits(currentCredits);
       sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
    }
 
@@ -590,16 +587,6 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
          return 0;
       }
 
-      if (!creditsSemaphore.tryAcquire()) {
-         try {
-            creditsSemaphore.acquire();
-         } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            // nothing to be done here.. we just keep going
-            throw new IllegalStateException(e.getMessage(), e);
-         }
-      }
-
       // presettle means we can settle the message on the dealer side before 
we send it, i.e.
       // for browsers
       boolean preSettle = sender.getRemoteSenderSettleMode() == 
SenderSettleMode.SETTLED;

Reply via email to