This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new bc83b31 ARTEMIS-3133 Reduce memory usage for Null an Ping packets
new 4d934e3 This closes #3462
bc83b31 is described below
commit bc83b3123277ce3bc2d2a030ae55770497ec3b51
Author: franz1981 <[email protected]>
AuthorDate: Sun Feb 21 21:00:26 2021 +0100
ARTEMIS-3133 Reduce memory usage for Null an Ping packets
---
.../core/protocol/core/impl/PacketImpl.java | 1 -
.../core/impl/wireformat/NullResponseMessage.java | 10 +++
.../impl/wireformat/NullResponseMessage_V2.java | 16 ++++
.../core/protocol/core/impl/wireformat/Ping.java | 6 ++
.../protocol/core/ServerSessionPacketHandler.java | 86 +++++++++++++++++++---
.../protocol/core/impl/ActiveMQPacketHandler.java | 2 +-
6 files changed, 107 insertions(+), 14 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index aea8b39..e4a759b 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -470,5 +470,4 @@ public class PacketImpl implements Packet {
public void setCorrelationID(long correlationID) {
}
-
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage.java
index b979495..6a0b560 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage.java
@@ -28,4 +28,14 @@ public class NullResponseMessage extends PacketImpl {
public boolean isResponse() {
return true;
}
+
+ @Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE;
+ }
+
+ public void reset() {
+ size = 0;
+ channelID = 0;
+ }
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
index e3453af..16b6d19 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java
@@ -40,6 +40,11 @@ public class NullResponseMessage_V2 extends
NullResponseMessage {
}
@Override
+ public void setCorrelationID(long correlationID) {
+ this.correlationID = correlationID;
+ }
+
+ @Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
buffer.writeLong(correlationID);
@@ -54,6 +59,11 @@ public class NullResponseMessage_V2 extends
NullResponseMessage {
}
@Override
+ public int expectedEncodeSize() {
+ return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
+ }
+
+ @Override
public final boolean isResponse() {
return true;
}
@@ -93,4 +103,10 @@ public class NullResponseMessage_V2 extends
NullResponseMessage {
}
return true;
}
+
+ @Override
+ public void reset() {
+ super.reset();
+ correlationID = 0;
+ }
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/Ping.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/Ping.java
index 2656294..ed6b3e2 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/Ping.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/Ping.java
@@ -18,6 +18,7 @@ package
org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.utils.DataConstants;
/**
* Ping is sent on the client side by {@link
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl}. At the
server's
@@ -57,6 +58,11 @@ public final class Ping extends PacketImpl {
}
@Override
+ public int expectedEncodeSize() {
+ return PACKET_HEADERS_SIZE + DataConstants.SIZE_LONG;
+ }
+
+ @Override
public String toString() {
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", connectionTTL=" + connectionTTL);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index aea354a..ce996de 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -20,7 +20,9 @@ import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Objects;
+import java.util.Queue;
+import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -35,7 +37,6 @@ import
org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManager;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
import
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage_V2;
@@ -144,6 +145,8 @@ import static
org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
public class ServerSessionPacketHandler implements ChannelHandler {
+ private static final int MAX_CACHED_NULL_RESPONSES = 32;
+
private static final Logger logger =
Logger.getLogger(ServerSessionPacketHandler.class);
private final ServerSession session;
@@ -158,8 +161,6 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
private final ArtemisExecutor callExecutor;
- private final CoreProtocolManager manager;
-
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
@@ -167,18 +168,18 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
private final Object largeMessageLock = new Object();
+ private final Queue<NullResponseMessage> cachedNullRes;
+
+ private final Queue<NullResponseMessage_V2> cachedNullRes_V2;
+
public ServerSessionPacketHandler(final ActiveMQServer server,
- final CoreProtocolManager manager,
final ServerSession session,
- final StorageManager storageManager,
final Channel channel) {
- this.manager = manager;
-
this.session = session;
session.addCloseable((boolean failed) -> clearLargeMessage());
- this.storageManager = storageManager;
+ this.storageManager = server.getStorageManager();
this.channel = channel;
@@ -195,6 +196,16 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
this.packetActor = new Actor<>(callExecutor, this::onMessagePacket);
this.direct = conn.isDirectDeliver();
+
+ // no confirmation window size means no resend cache hence
NullResponsePackets
+ // won't get cached on it because need confirmation
+ if (this.channel.getConfirmationWindowSize() == -1) {
+ cachedNullRes =
PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
+ cachedNullRes_V2 =
PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
+ } else {
+ cachedNullRes = null;
+ cachedNullRes_V2 = null;
+ }
}
private void clearLargeMessage() {
@@ -653,17 +664,51 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
return RoutingType.MULTICAST;
}
+ private boolean requireNullResponseMessage_V1(Packet packet) {
+ return !packet.isResponseAsync() ||
channel.getConnection().isVersionBeforeAsyncResponseChange();
+ }
- private Packet createNullResponseMessage(Packet packet) {
- final Packet response;
- if (!packet.isResponseAsync() ||
channel.getConnection().isVersionBeforeAsyncResponseChange()) {
+ private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
+ assert requireNullResponseMessage_V1(packet);
+ NullResponseMessage response;
+ if (cachedNullRes != null) {
+ response = cachedNullRes.poll();
+ if (response == null) {
+ response = new NullResponseMessage();
+ } else {
+ response.reset();
+ }
+ } else {
response = new NullResponseMessage();
+ }
+ return response;
+ }
+
+ private NullResponseMessage_V2 createNullResponseMessage_V2(Packet packet) {
+ assert !requireNullResponseMessage_V1(packet);
+ NullResponseMessage_V2 response;
+ if (cachedNullRes_V2 != null) {
+ response = cachedNullRes_V2.poll();
+ if (response == null) {
+ response = new NullResponseMessage_V2(packet.getCorrelationID());
+ } else {
+ response.reset();
+ // this should be already set by the channel too, but let's do it
just in case
+ response.setCorrelationID(packet.getCorrelationID());
+ }
} else {
response = new NullResponseMessage_V2(packet.getCorrelationID());
}
return response;
}
+ private Packet createNullResponseMessage(Packet packet) {
+ if (requireNullResponseMessage_V1(packet)) {
+ return createNullResponseMessage_V1(packet);
+ }
+ return createNullResponseMessage_V2(packet);
+ }
+
private Packet createSessionXAResponseMessage(Packet packet) {
Packet response;
if (packet.isResponseAsync()) {
@@ -674,6 +719,19 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
return response;
}
+ private void releaseResponse(Packet packet) {
+ if (cachedNullRes == null || cachedNullRes_V2 == null) {
+ return;
+ }
+ if (packet instanceof NullResponseMessage) {
+ cachedNullRes.offer((NullResponseMessage) packet);
+ return;
+ }
+ if (packet instanceof NullResponseMessage_V2) {
+ cachedNullRes_V2.offer((NullResponseMessage_V2) packet);
+ }
+ }
+
private void onSessionAcknowledge(Packet packet) {
this.storageManager.setContext(session.getSessionContext());
try {
@@ -921,7 +979,11 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
}
if (response != null) {
- channel.send(response);
+ try {
+ channel.send(response);
+ } finally {
+ releaseResponse(response);
+ }
}
if (closeChannel) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
index 2eb7bde..e90bcb2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -173,7 +173,7 @@ public class ActiveMQPacketHandler implements
ChannelHandler {
ServerSession session = server.createSession(request.getName(),
activeMQPrincipal == null ? request.getUsername() :
activeMQPrincipal.getUserName(), activeMQPrincipal == null ?
request.getPassword() : activeMQPrincipal.getPassword(),
request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(),
request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(),
request.getDefaultAddress(), sessionCallback, true, sessionOperationContext,
routingTypeMap, protocolM [...]
ServerProducer serverProducer = new
ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
session.addProducer(serverProducer);
- ServerSessionPacketHandler handler = new
ServerSessionPacketHandler(server, protocolManager, session,
server.getStorageManager(), channel);
+ ServerSessionPacketHandler handler = new
ServerSessionPacketHandler(server, session, channel);
channel.setHandler(handler);
sessionCallback.setSessionHandler(handler);