wy96f commented on a change in pull request #2832: ARTEMIS-2482 Large messages 
could leak native ByteBuffers
URL: https://github.com/apache/activemq-artemis/pull/2832#discussion_r323071598
 
 

 ##########
 File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ##########
 @@ -832,46 +835,117 @@ public void stopReplication() {
       }
    }
 
+   static final int LARGE_MESSAGE_CHUNK_SIZE = 100 * 1024;
+
+   private void addBytesUsingTempNativeBuffer(final SequentialFile file, final 
ActiveMQBuffer bytes) throws Exception {
+      assert file instanceof NIOSequentialFile;
+      //we can't use the actual content of it as it is and need to perform a 
copy into a direct ByteBuffer
+      int readableBytes = bytes.readableBytes();
+      final int requiredCapacity = Math.min(LARGE_MESSAGE_CHUNK_SIZE, 
readableBytes);
+      final ByteBuf tempBuffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(requiredCapacity, requiredCapacity);
+      try {
+         int readerIndex = bytes.readerIndex();
+         while (readableBytes > 0) {
+            final int size = Math.min(readableBytes, LARGE_MESSAGE_CHUNK_SIZE);
+            final ByteBuffer nioBytes = tempBuffer.internalNioBuffer(0, size);
+            final int position = nioBytes.position();
+            bytes.getBytes(readerIndex, nioBytes);
+            nioBytes.position(position);
+            file.blockingWriteDirect(nioBytes, false, false);
+            readerIndex += size;
+            readableBytes -= size;
+         }
+      } finally {
+         tempBuffer.release();
+      }
+   }
+
    public final void addBytesToLargeMessage(final SequentialFile file,
                                             final long messageId,
                                             final ActiveMQBuffer bytes) throws 
Exception {
       readLock();
       try {
          file.position(file.size());
          if (bytes.byteBuf() != null && bytes.byteBuf().nioBufferCount() == 1) 
{
-            final ByteBuffer nioBytes = 
bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
-            file.blockingWriteDirect(nioBytes, false, false);
-
-            if (isReplicated()) {
-               //copy defensively bytes
-               final byte[] bytesCopy = new byte[bytes.readableBytes()];
-               bytes.getBytes(bytes.readerIndex(), bytesCopy);
-               replicator.largeMessageWrite(messageId, bytesCopy);
+            //NIO -> need direct ByteBuffers, while JDBC the opposite
+            if (file instanceof NIOSequentialFile) {
+               if (bytes.byteBuf().isDirect()) {
+                  final ByteBuffer nioBytes = 
bytes.byteBuf().internalNioBuffer(bytes.readerIndex(), bytes.readableBytes());
 
 Review comment:
   Do we need to judge nioBufferCount() == 1 first?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to