This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 05c55d3 ARTEMIS-3133 Just Encapsulating ObjectPool into a small
utility
new d0cbd08 This closes #3475
05c55d3 is described below
commit 05c55d382c3310cb8f98593889074e6f194c3d44
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 3 15:46:14 2021 -0500
ARTEMIS-3133 Just Encapsulating ObjectPool into a small utility
---
.../activemq/artemis/utils/pools/MpscPool.java | 51 +++++++++++++++++
.../apache/activemq/artemis/utils/pools/Pool.java | 66 ++++++++++++++++++++++
.../protocol/core/ServerSessionPacketHandler.java | 53 +++++------------
3 files changed, 132 insertions(+), 38 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java
new file mode 100644
index 0000000..86e0163
--- /dev/null
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.pools;
+
+import java.util.Queue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import io.netty.util.internal.PlatformDependent;
+
+
+/**
+ * A simple encapsulation of Netty MpscQueue to provide a pool of objects.
+ * Use this pool only when the borrowing of object (consume) is done on a
single thread.
+ * This is using a Multi Producer Single Consumer queue (MPSC).
+ * If you need other uses you may create different strategies for
ObjectPooling.
+ * @param <T>
+ */
+public class MpscPool<T> extends Pool<T> {
+
+ public MpscPool(int maxSize, Consumer<T> cleaner, Supplier<T> supplier) {
+ super(maxSize, cleaner, supplier);
+ }
+
+ @Override
+ protected Queue<T> createQueue(int maxSize) {
+ final Queue<T> internalPool;
+ if (maxSize > 0) {
+ internalPool = PlatformDependent.newFixedMpscQueue(maxSize);
+ } else {
+ internalPool = null;
+ }
+ return internalPool;
+ }
+
+}
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
new file mode 100644
index 0000000..8f9340e
--- /dev/null
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.pools;
+
+import java.util.Queue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * A simple encapsulation to provide a pool of objects.
+ * @param <T>
+ */
+public abstract class Pool<T> {
+
+ private final Queue<T> internalPool;
+
+ private final Consumer<T> cleaner;
+ private final Supplier<T> supplier;
+
+ public Pool(int maxSize, Consumer<T> cleaner, Supplier<T> supplier) {
+ internalPool = createQueue(maxSize);
+ this.cleaner = cleaner;
+ this.supplier = supplier;
+ }
+
+ abstract Queue<T> createQueue(int maxSize);
+
+ /** Use this to instantiate or return objects from the pool */
+ public final T borrow() {
+ if (internalPool == null) {
+ return supplier.get();
+ }
+
+ T returnObject = internalPool.poll();
+
+ if (returnObject == null) {
+ returnObject = supplier.get();
+ } else {
+ cleaner.accept(returnObject);
+ }
+
+ return returnObject;
+ }
+
+ /** Return objects to the pool, they will be either reused or ignored by
the max size */
+ public final void release(T object) {
+ if (internalPool != null) {
+ internalPool.offer(object);
+ }
+ }
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index ce996de..5ec11c5 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -20,9 +20,7 @@ import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Objects;
-import java.util.Queue;
-import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -99,6 +97,8 @@ import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
+import org.apache.activemq.artemis.utils.pools.Pool;
import org.apache.activemq.artemis.utils.SimpleFuture;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.actors.Actor;
@@ -168,9 +168,9 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
private final Object largeMessageLock = new Object();
- private final Queue<NullResponseMessage> cachedNullRes;
+ private final Pool<NullResponseMessage> poolNullResponse;
- private final Queue<NullResponseMessage_V2> cachedNullRes_V2;
+ private final Pool<NullResponseMessage_V2> poolNullResponseV2;
public ServerSessionPacketHandler(final ActiveMQServer server,
final ServerSession session,
@@ -199,13 +199,8 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
// no confirmation window size means no resend cache hence
NullResponsePackets
// won't get cached on it because need confirmation
- if (this.channel.getConfirmationWindowSize() == -1) {
- cachedNullRes =
PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
- cachedNullRes_V2 =
PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
- } else {
- cachedNullRes = null;
- cachedNullRes_V2 = null;
- }
+ poolNullResponse = new
MpscPool<>(this.channel.getConfirmationWindowSize() == -1 ?
MAX_CACHED_NULL_RESPONSES : 0, NullResponseMessage::reset, () -> new
NullResponseMessage());
+ poolNullResponseV2 = new
MpscPool<>(this.channel.getConfirmationWindowSize() == -1 ?
MAX_CACHED_NULL_RESPONSES : 0, NullResponseMessage_V2::reset, () -> new
NullResponseMessage_V2());
}
private void clearLargeMessage() {
@@ -670,35 +665,17 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
assert requireNullResponseMessage_V1(packet);
- NullResponseMessage response;
- if (cachedNullRes != null) {
- response = cachedNullRes.poll();
- if (response == null) {
- response = new NullResponseMessage();
- } else {
- response.reset();
- }
- } else {
- response = new NullResponseMessage();
- }
- return response;
+ return poolNullResponse.borrow();
}
private NullResponseMessage_V2 createNullResponseMessage_V2(Packet packet) {
assert !requireNullResponseMessage_V1(packet);
NullResponseMessage_V2 response;
- if (cachedNullRes_V2 != null) {
- response = cachedNullRes_V2.poll();
- if (response == null) {
- response = new NullResponseMessage_V2(packet.getCorrelationID());
- } else {
- response.reset();
- // this should be already set by the channel too, but let's do it
just in case
- response.setCorrelationID(packet.getCorrelationID());
- }
- } else {
- response = new NullResponseMessage_V2(packet.getCorrelationID());
- }
+ response = poolNullResponseV2.borrow();
+
+ // this should be already set by the channel too, but let's do it just
in case
+ response.setCorrelationID(packet.getCorrelationID());
+
return response;
}
@@ -720,15 +697,15 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
}
private void releaseResponse(Packet packet) {
- if (cachedNullRes == null || cachedNullRes_V2 == null) {
+ if (poolNullResponse == null || poolNullResponseV2 == null) {
return;
}
if (packet instanceof NullResponseMessage) {
- cachedNullRes.offer((NullResponseMessage) packet);
+ poolNullResponse.release((NullResponseMessage) packet);
return;
}
if (packet instanceof NullResponseMessage_V2) {
- cachedNullRes_V2.offer((NullResponseMessage_V2) packet);
+ poolNullResponseV2.release((NullResponseMessage_V2) packet);
}
}