This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d9eb6b110e HDDS-7117. Consider reading chunk files using 
MappedByteBuffer. (#3674)
d9eb6b110e is described below

commit d9eb6b110e2c645309c8900367f44dfeeacb2d55
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Dec 6 07:36:49 2023 -0800

    HDDS-7117. Consider reading chunk files using MappedByteBuffer. (#3674)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   6 +-
 .../apache/hadoop/ozone/common/ChunkBuffer.java    |   9 +-
 .../common/ChunkBufferImplWithByteBufferList.java  |   4 +-
 .../common/src/main/resources/ozone-default.xml    |  10 +-
 .../TestChunkBufferImplWithByteBufferList.java     |   2 +-
 .../hadoop/hdds/conf/ConfigurationSource.java      |  11 ++
 .../container/keyvalue/helpers/ChunkUtils.java     |  68 +++++++++----
 .../container/keyvalue/impl/BlockManagerImpl.java  |  23 ++---
 .../keyvalue/impl/FilePerBlockStrategy.java        |   8 +-
 .../keyvalue/impl/FilePerChunkStrategy.java        |  14 +--
 .../keyvalue/interfaces/BlockManager.java          |   3 +
 .../container/keyvalue/helpers/TestChunkUtils.java | 112 ++++++++++++++++++---
 12 files changed, 204 insertions(+), 66 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1eb15b2848..7e01afd559 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -139,7 +139,11 @@ public final class ScmConfigKeys {
   public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY =
       "ozone.chunk.read.buffer.default.size";
   public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT =
-      "64KB";
+      "1MB";
+  public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY =
+      "ozone.chunk.read.mapped.buffer.threshold";
+  public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT =
+      "32KB";
 
   public static final String OZONE_SCM_CONTAINER_LAYOUT_KEY =
       "ozone.scm.container.layout";
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index 7d069cddc6..3948b5f04f 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -58,6 +59,10 @@ public interface ChunkBuffer {
 
   /** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */
   static ChunkBuffer wrap(List<ByteBuffer> buffers) {
+    Objects.requireNonNull(buffers, "buffers == null");
+    if (buffers.size() == 1) {
+      return wrap(buffers.get(0));
+    }
     return new ChunkBufferImplWithByteBufferList(buffers);
   }
 
@@ -91,8 +96,7 @@ public interface ChunkBuffer {
 
   /** Similar to {@link ByteBuffer#put(byte[])}. */
   default ChunkBuffer put(byte b) {
-    byte[] buf = new byte[1];
-    buf[0] = (byte) b;
+    final byte[] buf = {b};
     return put(buf, 0, 1);
   }
 
@@ -116,7 +120,6 @@ public interface ChunkBuffer {
 
   /**
    * Iterate the buffer from the current position to the current limit.
-   *
    * Upon the iteration complete,
    * the buffer's position will be equal to its limit.
    *
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index f6a7f60b0a..7c3a0c7d2d 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.Function;
 
 /**
@@ -50,8 +51,7 @@ public class ChunkBufferImplWithByteBufferList implements 
ChunkBuffer {
   private int currentIndex;
 
   ChunkBufferImplWithByteBufferList(List<ByteBuffer> buffers) {
-    Preconditions.checkArgument(buffers != null, "buffer == null");
-
+    Objects.requireNonNull(buffers, "buffers == null");
     this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) :
         EMPTY_BUFFER;
     this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum();
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index bf9a2f511b..db16381634 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -836,7 +836,7 @@
   </property>
   <property>
     <name>ozone.chunk.read.buffer.default.size</name>
-    <value>64KB</value>
+    <value>1MB</value>
     <tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
     <description>
       The default read buffer size during read chunk operations when checksum
@@ -847,6 +847,14 @@
       (ozone.client.bytes.per.checksum) corresponding to the chunk.
     </description>
   </property>
+  <property>
+    <name>ozone.chunk.read.mapped.buffer.threshold</name>
+    <value>32KB</value>
+    <tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
+    <description>
+      The default read threshold to use memory mapped buffers.
+    </description>
+  </property>
   <property>
     <name>ozone.scm.container.layout</name>
     <value>FILE_PER_BLOCK</value>
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
index 3da43166e7..072c07be64 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
@@ -36,7 +36,7 @@ public class TestChunkBufferImplWithByteBufferList {
   @Test
   public void rejectsNullList() {
     List<ByteBuffer> list = null;
-    assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
+    assertThrows(NullPointerException.class, () -> ChunkBuffer.wrap(list));
   }
 
   @Test
diff --git 
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
 
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
index dae095a193..b1a20c9aec 100644
--- 
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
+++ 
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
@@ -285,6 +285,17 @@ public interface ConfigurationSource {
     }
   }
 
+  default int getBufferSize(String name, String defaultValue) {
+    final double size = getStorageSize(name, defaultValue, StorageUnit.BYTES);
+    if (size <= 0) {
+      throw new IllegalArgumentException(name + " <= 0");
+    } else if (size > Integer.MAX_VALUE) {
+      throw new IllegalArgumentException(
+          name + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE);
+    }
+    return (int) size;
+  }
+
   default double getStorageSize(String name, String defaultValue,
       StorageUnit targetUnit) {
     String vString = get(name);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index 85f3e21422..7266904139 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 import java.nio.file.NoSuchFileException;
@@ -32,8 +33,11 @@ import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.nio.file.attribute.FileAttribute;
 import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
@@ -61,7 +65,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
 import static 
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 
-import org.apache.ratis.util.function.CheckedConsumer;
+import org.apache.ratis.util.function.CheckedFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -184,26 +188,24 @@ public final class ChunkUtils {
   }
 
   public static ChunkBuffer readData(long len, int bufferCapacity,
-      CheckedConsumer<ByteBuffer[], StorageContainerException> readMethod)
+      File file, long off, HddsVolume volume, int readMappedBufferThreshold)
       throws StorageContainerException {
+    if (len > readMappedBufferThreshold) {
+      return readData(file, bufferCapacity, off, len, volume);
+    } else if (len == 0) {
+      return ChunkBuffer.wrap(Collections.emptyList());
+    }
+
     final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(len,
         bufferCapacity);
-    readMethod.accept(buffers);
+    readData(file, off, len, c -> c.position(off).read(buffers), volume);
+    Arrays.stream(buffers).forEach(ByteBuffer::flip);
     return ChunkBuffer.wrap(Arrays.asList(buffers));
   }
 
-  /**
-   * Reads data from an existing chunk file into a list of ByteBuffers.
-   *
-   * @param file file where data lives
-   * @param buffers
-   * @param offset
-   * @param len
-   * @param volume for statistics and checker
-   */
-  public static void readData(File file, ByteBuffer[] buffers,
-      long offset, long len, HddsVolume volume)
-      throws StorageContainerException {
+  private static void readData(File file, long offset, long len,
+      CheckedFunction<FileChannel, Long, IOException> readMethod,
+      HddsVolume volume) throws StorageContainerException {
 
     final Path path = file.toPath();
     final long startTime = Time.monotonicNow();
@@ -213,8 +215,7 @@ public final class ChunkUtils {
       bytesRead = processFileExclusively(path, () -> {
         try (FileChannel channel = open(path, READ_OPTIONS, NO_ATTRIBUTES);
              FileLock ignored = channel.lock(offset, len, true)) {
-
-          return channel.position(offset).read(buffers);
+          return readMethod.apply(channel);
         } catch (IOException e) {
           onFailure(volume);
           throw new UncheckedIOException(e);
@@ -239,10 +240,37 @@ public final class ChunkUtils {
         bytesRead, offset, file);
 
     validateReadSize(len, bytesRead);
+  }
 
-    for (ByteBuffer buf : buffers) {
-      buf.flip();
-    }
+  /**
+   * Read data from the given file using
+   * {@link FileChannel#map(FileChannel.MapMode, long, long)},
+   * whose javadoc recommends that it is generally only worth mapping
+   * relatively large files (larger than a few tens of kilobytes)
+   * into memory from the standpoint of performance.
+   *
+   * @return a list of {@link MappedByteBuffer} containing the data.
+   */
+  private static ChunkBuffer readData(File file, int chunkSize,
+      long offset, long length, HddsVolume volume)
+      throws StorageContainerException {
+
+    final List<ByteBuffer> buffers = new ArrayList<>(
+        Math.toIntExact((length - 1) / chunkSize) + 1);
+    readData(file, offset, length, channel -> {
+      long readLen = 0;
+      while (readLen < length) {
+        final int n = Math.toIntExact(Math.min(length - readLen, chunkSize));
+        final ByteBuffer mapped = channel.map(
+            FileChannel.MapMode.READ_ONLY, offset + readLen, n);
+        LOG.debug("mapped: offset={}, readLen={}, n={}, {}",
+            offset, readLen, n, mapped.getClass());
+        readLen += mapped.remaining();
+        buffers.add(mapped);
+      }
+      return readLen;
+    }, volume);
+    return ChunkBuffer.wrap(buffers);
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 449fe46ae0..62896561f2 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -60,6 +59,7 @@ public class BlockManagerImpl implements BlockManager {
 
   // Default Read Buffer capacity when Checksum is not present
   private final int defaultReadBufferCapacity;
+  private final int readMappedBufferThreshold;
 
   /**
    * Constructs a Block Manager.
@@ -69,19 +69,12 @@ public class BlockManagerImpl implements BlockManager {
   public BlockManagerImpl(ConfigurationSource conf) {
     Preconditions.checkNotNull(conf, "Config cannot be null");
     this.config = conf;
-    final double size = config.getStorageSize(
+    this.defaultReadBufferCapacity = config.getBufferSize(
         ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY,
-        ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT,
-        StorageUnit.BYTES);
-    if (size <= 0) {
-      throw new IllegalArgumentException(
-          ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY + " <= 0");
-    } else if (size > Integer.MAX_VALUE) {
-      throw new IllegalArgumentException(
-          ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY
-              + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE);
-    }
-    this.defaultReadBufferCapacity = (int) size;
+        ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT);
+    this.readMappedBufferThreshold = config.getBufferSize(
+        ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY,
+        ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT);
   }
 
   @Override
@@ -267,6 +260,10 @@ public class BlockManagerImpl implements BlockManager {
     return defaultReadBufferCapacity;
   }
 
+  public int getReadMappedBufferThreshold() {
+    return readMappedBufferThreshold;
+  }
+
   /**
    * Deletes an existing block.
    * As Deletion is handled by BlockDeletingService,
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index ccab7f35e8..040b03c3dc 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -60,7 +60,6 @@ import static 
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersi
 import static 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage.COMMIT_DATA;
 import static 
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
 import static 
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.limitReadSize;
-import static 
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.readData;
 import static 
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.validateChunkForOverwrite;
 import static 
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.verifyChunkFileExists;
 
@@ -75,6 +74,7 @@ public class FilePerBlockStrategy implements ChunkManager {
   private final boolean doSyncWrite;
   private final OpenFiles files = new OpenFiles();
   private final int defaultReadBufferCapacity;
+  private final int readMappedBufferThreshold;
   private final VolumeSet volumeSet;
 
   public FilePerBlockStrategy(boolean sync, BlockManager manager,
@@ -82,6 +82,8 @@ public class FilePerBlockStrategy implements ChunkManager {
     doSyncWrite = sync;
     this.defaultReadBufferCapacity = manager == null ? 0 :
         manager.getDefaultReadBufferCapacity();
+    this.readMappedBufferThreshold = manager == null ? 0
+        : manager.getReadMappedBufferThreshold();
     this.volumeSet = volSet;
   }
 
@@ -192,8 +194,8 @@ public class FilePerBlockStrategy implements ChunkManager {
     long offset = info.getOffset();
     int bufferCapacity =  ChunkManager.getBufferCapacityForChunkRead(info,
         defaultReadBufferCapacity);
-    return readData(len, bufferCapacity,
-        array -> readData(chunkFile, array, offset, len, volume));
+    return ChunkUtils.readData(len, bufferCapacity, chunkFile, offset, volume,
+        readMappedBufferThreshold);
   }
 
   @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
index 13aa9c50f7..31a340f310 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
@@ -20,14 +20,12 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
 
 import com.google.common.base.Preconditions;
 
-import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
-import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
@@ -48,7 +46,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
@@ -69,6 +66,7 @@ public class FilePerChunkStrategy implements ChunkManager {
   private final boolean doSyncWrite;
   private final BlockManager blockManager;
   private final int defaultReadBufferCapacity;
+  private final int readMappedBufferThreshold;
   private final VolumeSet volumeSet;
 
   public FilePerChunkStrategy(boolean sync, BlockManager manager,
@@ -77,6 +75,8 @@ public class FilePerChunkStrategy implements ChunkManager {
     blockManager = manager;
     this.defaultReadBufferCapacity = manager == null ? 0 :
         manager.getDefaultReadBufferCapacity();
+    this.readMappedBufferThreshold = manager == null ? 0
+        : manager.getReadMappedBufferThreshold();
     this.volumeSet = volSet;
   }
 
@@ -233,9 +233,6 @@ public class FilePerChunkStrategy implements ChunkManager {
     int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
         defaultReadBufferCapacity);
 
-    ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len,
-        bufferCapacity);
-
     long chunkFileOffset = 0;
     if (info.getOffset() != 0) {
       try {
@@ -267,8 +264,8 @@ public class FilePerChunkStrategy implements ChunkManager {
         if (file.exists()) {
           long offset = info.getOffset() - chunkFileOffset;
           Preconditions.checkState(offset >= 0);
-          ChunkUtils.readData(file, dataBuffers, offset, len, volume);
-          return ChunkBuffer.wrap(Lists.newArrayList(dataBuffers));
+          return ChunkUtils.readData(len, bufferCapacity, file, offset, volume,
+              readMappedBufferThreshold);
         }
       } catch (StorageContainerException ex) {
         //UNABLE TO FIND chunk is not a problem as we will try with the
@@ -276,7 +273,6 @@ public class FilePerChunkStrategy implements ChunkManager {
         if (ex.getResult() != UNABLE_TO_FIND_CHUNK) {
           throw ex;
         }
-        BufferUtils.clearBuffers(dataBuffers);
       }
     }
     throw new StorageContainerException(
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
index aa7285a232..02b7e93d50 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
@@ -93,6 +93,9 @@ public interface BlockManager {
 
   int getDefaultReadBufferCapacity();
 
+  /** @return the threshold to read using memory mapped buffers. */
+  int getReadMappedBufferThreshold();
+
   /**
    * Shutdown ContainerManager.
    */
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
index 554265688d..037de863c0 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
@@ -17,9 +17,12 @@
  */
 package org.apache.hadoop.ozone.container.keyvalue.helpers;
 
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -27,6 +30,7 @@ import java.nio.file.StandardOpenOption;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -36,7 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
-import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.ozone.test.GenericTestUtils;
 
@@ -64,6 +67,16 @@ public class TestChunkUtils {
       LoggerFactory.getLogger(TestChunkUtils.class);
 
   private static final String PREFIX = TestChunkUtils.class.getSimpleName();
+  private static final int BUFFER_CAPACITY = 1 << 20;
+  private static final int MAPPED_BUFFER_THRESHOLD = 32 << 10;
+  private static final Random RANDOM = new Random();
+
+  static ChunkBuffer readData(File file, long off, long len)
+      throws StorageContainerException {
+    LOG.info("off={}, len={}", off, len);
+    return ChunkUtils.readData(len, BUFFER_CAPACITY, file, off, null,
+        MAPPED_BUFFER_THRESHOLD);
+  }
 
   @Test
   public void concurrentReadOfSameFile() throws Exception {
@@ -85,12 +98,11 @@ public class TestChunkUtils {
         final int threadNumber = i;
         executor.execute(() -> {
           try {
-            ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len);
-            ChunkUtils.readData(file, readBuffers, offset, len, null);
-
+            final ChunkBuffer chunk = readData(file, offset, len);
             // There should be only one element in readBuffers
-            Assertions.assertEquals(1, readBuffers.length);
-            ByteBuffer readBuffer = readBuffers[0];
+            final List<ByteBuffer> buffers = chunk.asByteBufferList();
+            Assertions.assertEquals(1, buffers.size());
+            final ByteBuffer readBuffer = buffers.get(0);
 
             LOG.info("Read data ({}): {}", threadNumber,
                 new String(readBuffer.array(), UTF_8));
@@ -172,12 +184,11 @@ public class TestChunkUtils {
       int offset = 0;
       ChunkUtils.writeData(file, data, offset, len, null, true);
 
-      ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len);
-      ChunkUtils.readData(file, readBuffers, offset, len, null);
-
+      final ChunkBuffer chunk = readData(file, offset, len);
       // There should be only one element in readBuffers
-      Assertions.assertEquals(1, readBuffers.length);
-      ByteBuffer readBuffer = readBuffers[0];
+      final List<ByteBuffer> buffers = chunk.asByteBufferList();
+      Assertions.assertEquals(1, buffers.size());
+      final ByteBuffer readBuffer = buffers.get(0);
 
       assertArrayEquals(array, readBuffer.array());
       assertEquals(len, readBuffer.remaining());
@@ -220,15 +231,90 @@ public class TestChunkUtils {
     int len = 123;
     int offset = 0;
     File nonExistentFile = new File("nosuchfile");
-    ByteBuffer[] bufs = BufferUtils.assignByteBuffers(len, len);
 
     // when
     StorageContainerException e = assertThrows(
         StorageContainerException.class,
-        () -> ChunkUtils.readData(nonExistentFile, bufs, offset, len, null));
+        () -> readData(nonExistentFile, offset, len));
 
     // then
     Assertions.assertEquals(UNABLE_TO_FIND_CHUNK, e.getResult());
   }
 
+  @Test
+  public void testReadData() throws Exception {
+    final File dir = GenericTestUtils.getTestDir("testReadData");
+    try {
+      Assertions.assertTrue(dir.mkdirs());
+
+      // large file
+      final int large = 10 << 20; // 10MB
+      Assertions.assertTrue(large > MAPPED_BUFFER_THRESHOLD);
+      runTestReadFile(large, dir, true);
+
+      // small file
+      final int small = 30 << 10; // 30KB
+      Assertions.assertTrue(small <= MAPPED_BUFFER_THRESHOLD);
+      runTestReadFile(small, dir, false);
+
+      // boundary case
+      runTestReadFile(MAPPED_BUFFER_THRESHOLD, dir, false);
+
+      // empty file
+      runTestReadFile(0, dir, false);
+
+      for (int i = 0; i < 10; i++) {
+        final int length = RANDOM.nextInt(2 * MAPPED_BUFFER_THRESHOLD) + 1;
+        runTestReadFile(length, dir, length > MAPPED_BUFFER_THRESHOLD);
+      }
+    } finally {
+      FileUtils.deleteDirectory(dir);
+    }
+  }
+
+  void runTestReadFile(int length, File dir, boolean isMapped)
+      throws Exception {
+    final File file;
+    for (int i = length; ; i++) {
+      final File f = new File(dir, "file_" + i);
+      if (!f.exists()) {
+        file = f;
+        break;
+      }
+    }
+    LOG.info("file: {}", file);
+
+    // write a file
+    final byte[] array = new byte[BUFFER_CAPACITY];
+    final long seed = System.nanoTime();
+    LOG.info("seed: {}", seed);
+    RANDOM.setSeed(seed);
+    try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(
+        file.toPath(), StandardOpenOption.CREATE_NEW))) {
+      for (int written = 0; written < length;) {
+        RANDOM.nextBytes(array);
+        final int remaining = length - written;
+        final int toWrite = Math.min(remaining, array.length);
+        out.write(array, 0, toWrite);
+        written += toWrite;
+      }
+    }
+    Assertions.assertEquals(length, file.length());
+
+    // read the file back
+    final ChunkBuffer chunk = readData(file, 0, length);
+    Assertions.assertEquals(length, chunk.remaining());
+
+    final List<ByteBuffer> buffers = chunk.asByteBufferList();
+    LOG.info("buffers.size(): {}", buffers.size());
+    Assertions.assertEquals((length - 1) / BUFFER_CAPACITY + 1, 
buffers.size());
+    LOG.info("buffer class: {}", buffers.get(0).getClass());
+
+    RANDOM.setSeed(seed);
+    for (ByteBuffer b : buffers) {
+      Assertions.assertEquals(isMapped, b instanceof MappedByteBuffer);
+      RANDOM.nextBytes(array);
+      Assertions.assertEquals(ByteBuffer.wrap(array, 0, b.remaining()), b);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to