This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 881143252cd426c816930a2564dcd88bdf3c91d0
Author: Francesco Nigro <[email protected]>
AuthorDate: Wed Apr 17 19:56:36 2019 +0200

    ARTEMIS-2317 Avoid long TTSP caused by Page::read using mmap read
    
    It implements Page::read using chunked reading of SequentialFile
    instead of using memory mapped files causing long time to safepoint
    issues.
---
 .../activemq/artemis/core/paging/impl/Page.java    | 196 +++++++++++----------
 .../impl/fakes/FakeSequentialFileFactory.java      |   2 +-
 2 files changed, 108 insertions(+), 90 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index c8fbb3d..03d0e67 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -16,27 +16,20 @@
  */
 package org.apache.activemq.artemis.core.paging.impl;
 
-import java.io.File;
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.Unpooled;
-import io.netty.util.internal.PlatformDependent;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.io.SequentialFile;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
-import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
-import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
@@ -45,6 +38,7 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.jboss.logging.Logger;
 
@@ -87,8 +81,6 @@ public final class Page implements Comparable<Page> {
     */
    private Set<PageSubscriptionCounter> pendingCounters;
 
-   private boolean canBeMapped;
-
    public Page(final SimpleString storeName,
                final StorageManager storageManager,
                final SequentialFileFactory factory,
@@ -99,8 +91,6 @@ public final class Page implements Comparable<Page> {
       fileFactory = factory;
       this.storageManager = storageManager;
       this.storeName = storeName;
-      this.canBeMapped = fileFactory instanceof NIOSequentialFileFactory || 
fileFactory instanceof MappedSequentialFileFactory;
-      //pooled buffers to avoid allocations on hot paths
    }
 
    public int getPageId() {
@@ -120,105 +110,133 @@ public final class Page implements Comparable<Page> {
          throw ActiveMQMessageBundle.BUNDLE.invalidPageIO();
       }
 
-      final List<PagedMessage> messages = new ArrayList<>();
-
       size.lazySet((int) file.size());
 
-      if (this.canBeMapped) {
-         readFromMapped(storage, messages);
-         // if the file is open to be written
-         // it needs to updated the position
-         file.position(file.size());
-      } else {
-         readFromSequentialFile(storage, messages);
-      }
+      final List<PagedMessage> messages = readFromSequentialFile(storage);
 
       numberOfMessages.lazySet(messages.size());
 
       return messages;
    }
 
-   private void readFromSequentialFile(StorageManager storage, 
List<PagedMessage> messages) throws Exception {
-      final int fileSize = (int) file.size();
-      //doesn't need to be a direct buffer: that case is covered using the 
MMAP read
-      final ByteBuffer buffer = this.fileFactory.newBuffer(fileSize);
-      try {
-         file.position(0);
-         file.read(buffer);
-         buffer.rewind();
-         assert (buffer.limit() == fileSize) : "buffer doesn't contains the 
whole file";
-         ChannelBufferWrapper activeMQBuffer = wrapBuffer(fileSize, buffer);
-         read(storage, activeMQBuffer, messages);
-      } finally {
-         this.fileFactory.releaseBuffer(buffer);
-      }
+   private static void decodeInto(ByteBuffer fileBuffer, int encodedSize, 
PagedMessageImpl msg) {
+      final ActiveMQBuffer wrappedBuffer = 
ActiveMQBuffers.wrappedBuffer(fileBuffer);
+      wrappedBuffer.writerIndex(encodedSize);
+      msg.decode(wrappedBuffer);
    }
 
-   private ChannelBufferWrapper wrapBuffer(int fileSize, ByteBuffer buffer) {
-      ChannelBufferWrapper activeMQBuffer = new 
ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
-      return activeMQBuffer;
+   private ByteBuffer allocateAndReadIntoFileBuffer(ByteBuffer fileBuffer, int 
requiredBytes) throws Exception {
+      final ByteBuffer newFileBuffer = 
fileFactory.newBuffer(Math.max(requiredBytes, MIN_CHUNK_SIZE));
+      newFileBuffer.put(fileBuffer);
+      fileFactory.releaseBuffer(fileBuffer);
+      fileBuffer = newFileBuffer;
+      //move the limit to allow reading as much as possible from the file
+      fileBuffer.limit(fileBuffer.capacity());
+      file.read(fileBuffer);
+      fileBuffer.position(0);
+      return fileBuffer;
    }
 
-   private static MappedByteBuffer mapFileForRead(File file, int fileSize) {
-      try (FileChannel channel = FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
-         return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
-      } catch (Exception e) {
-         throw new IllegalStateException(e);
+   /**
+    * It returns a {@link ByteBuffer} that has {@link ByteBuffer#remaining()} 
bytes >= {@code requiredBytes}
+    * of valid data from {@link #file}.
+    */
+   private ByteBuffer readIntoFileBufferIfNecessary(ByteBuffer fileBuffer, int 
requiredBytes) throws Exception {
+      final int remaining = fileBuffer.remaining();
+      //fileBuffer::remaining is the current size of valid data
+      final int bytesToBeRead = requiredBytes - remaining;
+      if (bytesToBeRead > 0) {
+         final int capacity = fileBuffer.capacity();
+         //fileBuffer has enough overall capacity to hold all the required 
bytes?
+         if (capacity >= requiredBytes) {
+            //we do not care to use the free space between
+            //fileBuffer::limit and fileBuffer::capacity
+            //to save compactions, because fileBuffer
+            //is very unlikely to not be completely full
+            //after each file::read
+            if (fileBuffer.limit() > 0) {
+               //the previous check avoid compact
+               //to attempt a copy of 0 bytes
+               fileBuffer.compact();
+            } else {
+               //compact already set the limit == capacity
+               fileBuffer.limit(capacity);
+            }
+            file.read(fileBuffer);
+            fileBuffer.position(0);
+         } else {
+            fileBuffer = allocateAndReadIntoFileBuffer(fileBuffer, 
requiredBytes);
+         }
       }
+      return fileBuffer;
    }
 
-   private int readFromMapped(StorageManager storage, List<PagedMessage> 
messages) throws IOException {
+   //sizeOf(START_BYTE) + sizeOf(MESSAGE LENGTH) + sizeOf(END_BYTE)
+   private static final int HEADER_AND_TRAILER_SIZE = DataConstants.SIZE_INT + 
2;
+   private static final int MINIMUM_MSG_PERSISTENT_SIZE = 
HEADER_AND_TRAILER_SIZE;
+   private static final int MIN_CHUNK_SIZE = Env.osPageSize();
+
+   private List<PagedMessage> readFromSequentialFile(StorageManager storage) 
throws Exception {
+      final List<PagedMessage> messages = new ArrayList<>();
+      final int fileSize = (int) file.size();
       file.position(0);
-      //use a readonly mapped view of the file
-      final int mappedSize = size.get();
-      final MappedByteBuffer mappedByteBuffer = 
mapFileForRead(this.file.getJavaFile(), mappedSize);
-      ChannelBufferWrapper activeMQBuffer = wrapBuffer(mappedSize, 
mappedByteBuffer);
+      int processedBytes = 0;
+      ByteBuffer fileBuffer = null;
       try {
-         return read(storage, activeMQBuffer, messages);
-      } finally {
-         //unmap the file after read it to avoid GC to take care of it
-         PlatformDependent.freeDirectBuffer(mappedByteBuffer);
-      }
-   }
-
-   private int read(StorageManager storage, ActiveMQBuffer fileBuffer, 
List<PagedMessage> messages) {
-      int readMessages = 0;
-      while (fileBuffer.readable()) {
-         final int position = fileBuffer.readerIndex();
-
-         byte byteRead = fileBuffer.readByte();
-
-         if (byteRead == Page.START_BYTE) {
-            if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < 
fileBuffer.capacity()) {
-               int messageSize = fileBuffer.readInt();
-               int oldPos = fileBuffer.readerIndex();
-               if (fileBuffer.readerIndex() + messageSize < 
fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == 
Page.END_BYTE) {
-                  PagedMessage msg = new PagedMessageImpl(storageManager);
-                  msg.decode(fileBuffer);
-                  byte b = fileBuffer.readByte();
-                  if (b != Page.END_BYTE) {
-                     // Sanity Check: This would only happen if there is a bug 
on decode or any internal code, as
-                     // this
-                     // constraint was already checked
-                     throw new IllegalStateException("Internal error, it 
wasn't possible to locate END_BYTE " + b);
+         int remainingBytes = fileSize - processedBytes;
+         if (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE) {
+            fileBuffer = fileFactory.newBuffer(Math.min(remainingBytes, 
MIN_CHUNK_SIZE));
+            fileBuffer.limit(0);
+            do {
+               fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, 
MINIMUM_MSG_PERSISTENT_SIZE);
+               final byte startByte = fileBuffer.get();
+               if (startByte == Page.START_BYTE) {
+                  final int encodedSize = fileBuffer.getInt();
+                  final int nextPosition = processedBytes + 
HEADER_AND_TRAILER_SIZE + encodedSize;
+                  if (nextPosition <= fileSize) {
+                     fileBuffer = readIntoFileBufferIfNecessary(fileBuffer, 
encodedSize + 1);
+                     final int endPosition = fileBuffer.position() + 
encodedSize;
+                     //this check must be performed upfront decoding
+                     if (fileBuffer.remaining() >= (encodedSize + 1) && 
fileBuffer.get(endPosition) == Page.END_BYTE) {
+                        final PagedMessageImpl msg = new 
PagedMessageImpl(storageManager);
+                        decodeInto(fileBuffer, encodedSize, msg);
+                        fileBuffer.position(endPosition + 1);
+                        assert fileBuffer.get(endPosition) == Page.END_BYTE : 
"decoding cannot change end byte";
+                        msg.initMessage(storage);
+                        if (logger.isTraceEnabled()) {
+                           logger.tracef("Reading message %s on pageId=%d for 
address=%s", msg, pageId, storeName);
+                        }
+                        messages.add(msg);
+                        processedBytes = nextPosition;
+                     } else {
+                        markFileAsSuspect(file.getFileName(), processedBytes, 
messages.size());
+                        return messages;
+                     }
+                  } else {
+                     markFileAsSuspect(file.getFileName(), processedBytes, 
messages.size());
+                     return messages;
                   }
-                  msg.initMessage(storage);
-                  if (logger.isTraceEnabled()) {
-                     logger.trace("Reading message " + msg + " on pageId=" + 
this.pageId + " for address=" + storeName);
-                  }
-                  readMessages++;
-                  messages.add(msg);
                } else {
-                  markFileAsSuspect(file.getFileName(), position, 
messages.size());
-                  break;
+                  markFileAsSuspect(file.getFileName(), processedBytes, 
messages.size());
+                  return messages;
                }
+               remainingBytes = fileSize - processedBytes;
             }
-         } else {
-            markFileAsSuspect(file.getFileName(), position, messages.size());
-            break;
+            while (remainingBytes >= MINIMUM_MSG_PERSISTENT_SIZE);
+         }
+         //ignore incomplete messages at the end of the file
+         if (logger.isTraceEnabled()) {
+            logger.tracef("%s has %d bytes of unknown data at position = %d", 
file.getFileName(), remainingBytes, processedBytes);
+         }
+         return messages;
+      } finally {
+         if (fileBuffer != null) {
+            fileFactory.releaseBuffer(fileBuffer);
+         }
+         if (file.position() != fileSize) {
+            file.position(fileSize);
          }
       }
-      return readMessages;
    }
 
    public synchronized void write(final PagedMessage message) throws Exception 
{
@@ -228,7 +246,7 @@ public final class Page implements Comparable<Page> {
       final int messageEncodedSize = message.getEncodeSize();
       final int bufferSize = messageEncodedSize + Page.SIZE_RECORD;
       final ByteBuffer buffer = fileFactory.newBuffer(bufferSize);
-      ChannelBufferWrapper activeMQBuffer = wrapBuffer(bufferSize, buffer);
+      ChannelBufferWrapper activeMQBuffer = new 
ChannelBufferWrapper(Unpooled.wrappedBuffer(buffer));
       activeMQBuffer.clear();
       activeMQBuffer.writeByte(Page.START_BYTE);
       activeMQBuffer.writeInt(messageEncodedSize);
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
index 155831f..8368177 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
@@ -359,7 +359,7 @@ public class FakeSequentialFileFactory implements 
SequentialFileFactory {
             throw new IllegalStateException("Is closed");
          }
 
-         byte[] bytesRead = new byte[bytes.limit()];
+         byte[] bytesRead = new byte[Math.min(bytes.remaining(), 
data.remaining())];
 
          data.get(bytesRead);
 

Reply via email to