This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 05c55d3  ARTEMIS-3133 Just Encapsulating ObjectPool into a small 
utility
     new d0cbd08  This closes #3475
05c55d3 is described below

commit 05c55d382c3310cb8f98593889074e6f194c3d44
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Mar 3 15:46:14 2021 -0500

    ARTEMIS-3133 Just Encapsulating ObjectPool into a small utility
---
 .../activemq/artemis/utils/pools/MpscPool.java     | 51 +++++++++++++++++
 .../apache/activemq/artemis/utils/pools/Pool.java  | 66 ++++++++++++++++++++++
 .../protocol/core/ServerSessionPacketHandler.java  | 53 +++++------------
 3 files changed, 132 insertions(+), 38 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java
new file mode 100644
index 0000000..86e0163
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/MpscPool.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.pools;
+
+import java.util.Queue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import io.netty.util.internal.PlatformDependent;
+
+
+/**
+ * A simple encapsulation of Netty MpscQueue to provide a pool of objects.
+ * Use this pool only when the borrowing of object (consume) is done on a 
single thread.
+ * This is using a Multi Producer Single Consumer queue (MPSC).
+ * If you need other uses you may create different strategies for 
ObjectPooling.
+ * @param <T>
+ */
+public class MpscPool<T> extends Pool<T> {
+
+   public MpscPool(int maxSize, Consumer<T> cleaner, Supplier<T> supplier) {
+      super(maxSize, cleaner, supplier);
+   }
+
+   @Override
+   protected Queue<T> createQueue(int maxSize) {
+      final Queue<T> internalPool;
+      if (maxSize > 0) {
+         internalPool = PlatformDependent.newFixedMpscQueue(maxSize);
+      } else {
+         internalPool = null;
+      }
+      return internalPool;
+   }
+
+}
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
new file mode 100644
index 0000000..8f9340e
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/pools/Pool.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.pools;
+
+import java.util.Queue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * A simple encapsulation to provide a pool of objects.
+ * @param <T>
+ */
+public abstract class Pool<T> {
+
+   private final Queue<T> internalPool;
+
+   private final Consumer<T> cleaner;
+   private final Supplier<T> supplier;
+
+   public Pool(int maxSize, Consumer<T> cleaner, Supplier<T> supplier) {
+      internalPool = createQueue(maxSize);
+      this.cleaner = cleaner;
+      this.supplier = supplier;
+   }
+
+   abstract Queue<T> createQueue(int maxSize);
+
+   /** Use this to instantiate or return objects from the pool */
+   public final T borrow() {
+      if (internalPool == null) {
+         return supplier.get();
+      }
+
+      T returnObject = internalPool.poll();
+
+      if (returnObject == null) {
+         returnObject = supplier.get();
+      } else {
+         cleaner.accept(returnObject);
+      }
+
+      return returnObject;
+   }
+
+   /** Return objects to the pool, they will be either reused or ignored by 
the max size */
+   public final void release(T object) {
+      if (internalPool != null) {
+         internalPool.offer(object);
+      }
+   }
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index ce996de..5ec11c5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -20,9 +20,7 @@ import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.util.List;
 import java.util.Objects;
-import java.util.Queue;
 
-import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
@@ -99,6 +97,8 @@ import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.logs.AuditLogger;
 import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.pools.MpscPool;
+import org.apache.activemq.artemis.utils.pools.Pool;
 import org.apache.activemq.artemis.utils.SimpleFuture;
 import org.apache.activemq.artemis.utils.SimpleFutureImpl;
 import org.apache.activemq.artemis.utils.actors.Actor;
@@ -168,9 +168,9 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    private final Object largeMessageLock = new Object();
 
-   private final Queue<NullResponseMessage> cachedNullRes;
+   private final Pool<NullResponseMessage> poolNullResponse;
 
-   private final Queue<NullResponseMessage_V2> cachedNullRes_V2;
+   private final Pool<NullResponseMessage_V2> poolNullResponseV2;
 
    public ServerSessionPacketHandler(final ActiveMQServer server,
                                      final ServerSession session,
@@ -199,13 +199,8 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
       // no confirmation window size means no resend cache hence 
NullResponsePackets
       // won't get cached on it because need confirmation
-      if (this.channel.getConfirmationWindowSize() == -1) {
-         cachedNullRes = 
PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
-         cachedNullRes_V2 = 
PlatformDependent.newFixedMpscQueue(MAX_CACHED_NULL_RESPONSES);
-      } else {
-         cachedNullRes = null;
-         cachedNullRes_V2 = null;
-      }
+      poolNullResponse = new 
MpscPool<>(this.channel.getConfirmationWindowSize() == -1 ? 
MAX_CACHED_NULL_RESPONSES : 0, NullResponseMessage::reset, () -> new 
NullResponseMessage());
+      poolNullResponseV2 = new 
MpscPool<>(this.channel.getConfirmationWindowSize() == -1 ? 
MAX_CACHED_NULL_RESPONSES : 0, NullResponseMessage_V2::reset, () -> new 
NullResponseMessage_V2());
    }
 
    private void clearLargeMessage() {
@@ -670,35 +665,17 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
    private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
       assert requireNullResponseMessage_V1(packet);
-      NullResponseMessage response;
-      if (cachedNullRes != null) {
-         response = cachedNullRes.poll();
-         if (response == null) {
-            response = new NullResponseMessage();
-         } else {
-            response.reset();
-         }
-      } else {
-         response = new NullResponseMessage();
-      }
-      return response;
+      return  poolNullResponse.borrow();
    }
 
    private NullResponseMessage_V2 createNullResponseMessage_V2(Packet packet) {
       assert !requireNullResponseMessage_V1(packet);
       NullResponseMessage_V2 response;
-      if (cachedNullRes_V2 != null) {
-         response = cachedNullRes_V2.poll();
-         if (response == null) {
-            response = new NullResponseMessage_V2(packet.getCorrelationID());
-         } else {
-            response.reset();
-            // this should be already set by the channel too, but let's do it 
just in case
-            response.setCorrelationID(packet.getCorrelationID());
-         }
-      } else {
-         response = new NullResponseMessage_V2(packet.getCorrelationID());
-      }
+      response = poolNullResponseV2.borrow();
+
+      // this should be already set by the channel too, but let's do it just 
in case
+      response.setCorrelationID(packet.getCorrelationID());
+
       return response;
    }
 
@@ -720,15 +697,15 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
    }
 
    private void releaseResponse(Packet packet) {
-      if (cachedNullRes == null || cachedNullRes_V2 == null) {
+      if (poolNullResponse == null || poolNullResponseV2 == null) {
          return;
       }
       if (packet instanceof NullResponseMessage) {
-         cachedNullRes.offer((NullResponseMessage) packet);
+         poolNullResponse.release((NullResponseMessage) packet);
          return;
       }
       if (packet instanceof NullResponseMessage_V2) {
-         cachedNullRes_V2.offer((NullResponseMessage_V2) packet);
+         poolNullResponseV2.release((NullResponseMessage_V2) packet);
       }
    }
 

Reply via email to