This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new bc83b31  ARTEMIS-3133 Reduce memory usage for Null an Ping packets
     new 4d934e3  This closes #3462
bc83b31 is described below

commit bc83b3123277ce3bc2d2a030ae55770497ec3b51
Author: franz1981 <[email protected]>
AuthorDate: Sun Feb 21 21:00:26 2021 +0100

    ARTEMIS-3133 Reduce memory usage for Null an Ping packets
---
 .../core/protocol/core/impl/PacketImpl.java        |  1 -
 .../core/impl/wireformat/NullResponseMessage.java  | 10 +++
 .../impl/wireformat/NullResponseMessage_V2.java    | 16 ++++
 .../core/protocol/core/impl/wireformat/Ping.java   |  6 ++
 .../protocol/core/ServerSessionPacketHandler.java  | 86 +++++++++++++++++++---
 .../protocol/core/impl/ActiveMQPacketHandler.java  |  2 +-
 6 files changed, 107 insertions(+), 14 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index aea8b39..e4a759b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -470,5 +470,4 @@ public class PacketImpl implements Packet {
    public void setCorrelationID(long correlationID) {
    }
 
-
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage.java
index b979495..6a0b560 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage.java
@@ -28,4 +28,14 @@ public class NullResponseMessage extends PacketImpl {
    public boolean isResponse() {
       return true;
    }
+
+   @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE;
+   }
+
+   public void reset() {
+      size = 0;
+      channelID = 0;
+   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
index e3453af..16b6d19 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
@@ -40,6 +40,11 @@ public class NullResponseMessage_V2 extends 
NullResponseMessage {
    }
 
    @Override
+   public void setCorrelationID(long correlationID) {
+      this.correlationID = correlationID;
+   }
+
+   @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
       buffer.writeLong(correlationID);
@@ -54,6 +59,11 @@ public class NullResponseMessage_V2 extends 
NullResponseMessage {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
+   }
+
+   @Override
    public final boolean isResponse() {
       return true;
    }
@@ -93,4 +103,10 @@ public class NullResponseMessage_V2 extends 
NullResponseMessage {
       }
       return true;
    }
+
+   @Override
+   public void reset() {
+      super.reset();
+      correlationID = 0;
+   }
 }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/Ping.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/Ping.java
index 2656294..ed6b3e2 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/Ping.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/Ping.java
@@ -18,6 +18,7 @@ package 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
 
 /**
  * Ping is sent on the client side by {@link 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl}. At the 
server's
@@ -57,6 +58,11 @@ public final class Ping extends PacketImpl {
    }
 
    @Override
+   public int expectedEncodeSize() {
+      return PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG;
+   }
+
+   @Override
    public String toString() {
       StringBuffer buf = new StringBuffer(getParentString());
       buf.append(", connectionTTL=" + connectionTTL);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index aea354a..ce996de 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -20,7 +20,9 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.List;
 import java.util.Objects;
+import java.util.Queue;
 
+import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -35,7 +37,6 @@ import 
org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
 import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
@@ -144,6 +145,8 @@ import static 
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 
 public class ServerSessionPacketHandler implements ChannelHandler {
 
+   private static final int MAX_CACHED_NULL_RESPONSES = 32;
+
    private static final Logger logger = 
Logger.getLogger(ServerSessionPacketHandler.class);
 
    private final ServerSession session;
@@ -158,8 +161,6 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    private final ArtemisExecutor callExecutor;
 
-   private final CoreProtocolManager manager;
-
    // The current currentLargeMessage being processed
    private volatile LargeServerMessage currentLargeMessage;
 
@@ -167,18 +168,18 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    private final Object largeMessageLock = new Object();
 
+   private final Queue<NullResponseMessage> cachedNullRes;
+
+   private final Queue<NullResponseMessage_V2> cachedNullRes_V2;
+
    public ServerSessionPacketHandler(final ActiveMQServer server,
-                                     final CoreProtocolManager manager,
                                      final ServerSession session,
-                                     final StorageManager storageManager,
                                      final Channel channel) {
-      this.manager = manager;
-
       this.session = session;
 
       session.addCloseable((boolean failed) -> clearLargeMessage());
 
-      this.storageManager = storageManager;
+      this.storageManager = server.getStorageManager();
 
       this.channel = channel;
 
@@ -195,6 +196,16 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       this.packetActor = new Actor<>(callExecutor, this::onMessagePacket);
 
       this.direct = conn.isDirectDeliver();
+
+      // no confirmation window size means no resend cache hence 
NullResponsePackets
+      // won't get cached on it because need confirmation
+      if (this.channel.getConfirmationWindowSize() == -1) {
+         cachedNullRes = 
PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
+         cachedNullRes_V2 = 
PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
+      } else {
+         cachedNullRes = null;
+         cachedNullRes_V2 = null;
+      }
    }
 
    private void clearLargeMessage() {
@@ -653,17 +664,51 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       return RoutingType.MULTICAST;
    }
 
+   private boolean requireNullResponseMessage_V1(Packet packet) {
+      return !packet.isResponseAsync() || 
channel.getConnection().isVersionBeforeAsyncResponseChange();
+   }
 
-   private Packet createNullResponseMessage(Packet packet) {
-      final Packet response;
-      if (!packet.isResponseAsync() || 
channel.getConnection().isVersionBeforeAsyncResponseChange()) {
+   private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
+      assert requireNullResponseMessage_V1(packet);
+      NullResponseMessage response;
+      if (cachedNullRes != null) {
+         response = cachedNullRes.poll();
+         if (response == null) {
+            response = new NullResponseMessage();
+         } else {
+            response.reset();
+         }
+      } else {
          response = new NullResponseMessage();
+      }
+      return response;
+   }
+
+   private NullResponseMessage_V2 createNullResponseMessage_V2(Packet packet) {
+      assert !requireNullResponseMessage_V1(packet);
+      NullResponseMessage_V2 response;
+      if (cachedNullRes_V2 != null) {
+         response = cachedNullRes_V2.poll();
+         if (response == null) {
+            response = new NullResponseMessage_V2(packet.getCorrelationID());
+         } else {
+            response.reset();
+            // this should be already set by the channel too, but let's do it 
just in case
+            response.setCorrelationID(packet.getCorrelationID());
+         }
       } else {
          response = new NullResponseMessage_V2(packet.getCorrelationID());
       }
       return response;
    }
 
+   private Packet createNullResponseMessage(Packet packet) {
+      if (requireNullResponseMessage_V1(packet)) {
+         return createNullResponseMessage_V1(packet);
+      }
+      return createNullResponseMessage_V2(packet);
+   }
+
    private Packet createSessionXAResponseMessage(Packet packet) {
       Packet response;
       if (packet.isResponseAsync()) {
@@ -674,6 +719,19 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       return response;
    }
 
+   private void releaseResponse(Packet packet) {
+      if (cachedNullRes == null || cachedNullRes_V2 == null) {
+         return;
+      }
+      if (packet instanceof NullResponseMessage) {
+         cachedNullRes.offer((NullResponseMessage) packet);
+         return;
+      }
+      if (packet instanceof NullResponseMessage_V2) {
+         cachedNullRes_V2.offer((NullResponseMessage_V2) packet);
+      }
+   }
+
    private void onSessionAcknowledge(Packet packet) {
       this.storageManager.setContext(session.getSessionContext());
       try {
@@ -921,7 +979,11 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       }
 
       if (response != null) {
-         channel.send(response);
+         try {
+            channel.send(response);
+         } finally {
+            releaseResponse(response);
+         }
       }
 
       if (closeChannel) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 2eb7bde..e90bcb2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -173,7 +173,7 @@ public class ActiveMQPacketHandler implements 
ChannelHandler {
          ServerSession session = server.createSession(request.getName(), 
activeMQPrincipal == null ? request.getUsername() : 
activeMQPrincipal.getUserName(), activeMQPrincipal == null ? 
request.getPassword() : activeMQPrincipal.getPassword(), 
request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), 
request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), 
request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, 
routingTypeMap, protocolM [...]
          ServerProducer serverProducer = new 
ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
          session.addProducer(serverProducer);
-         ServerSessionPacketHandler handler = new 
ServerSessionPacketHandler(server, protocolManager, session, 
server.getStorageManager(), channel);
+         ServerSessionPacketHandler handler = new 
ServerSessionPacketHandler(server, session, channel);
          channel.setHandler(handler);
          sessionCallback.setSessionHandler(handler);
 

Reply via email to