HDDS-799. Avoid ByteString to byte array conversion cost by using ByteBuffer in 
Datanode. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/942693bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/942693bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/942693bd

Branch: refs/heads/HDDS-4
Commit: 942693bddd5fba51b85a5f677e3496a41817cff3
Parents: c8ca174
Author: Shashikant Banerjee <shashik...@apache.org>
Authored: Mon Nov 5 23:43:22 2018 +0530
Committer: Shashikant Banerjee <shashik...@apache.org>
Committed: Mon Nov 5 23:43:22 2018 +0530

----------------------------------------------------------------------
 .../container/keyvalue/KeyValueHandler.java     | 11 +++---
 .../container/keyvalue/helpers/ChunkUtils.java  | 28 ++++++++-------
 .../keyvalue/impl/ChunkManagerImpl.java         |  2 +-
 .../keyvalue/interfaces/ChunkManager.java       |  3 +-
 .../keyvalue/TestChunkManagerImpl.java          | 37 ++++++++++----------
 .../common/impl/TestContainerPersistence.java   | 28 ++++++++++-----
 6 files changed, 62 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/942693bd/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 4cb23ed..1271d99 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.keyvalue;
 
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -76,7 +77,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import static org.apache.hadoop.hdds.HddsConfigKeys
     .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
@@ -652,10 +653,10 @@ public class KeyValueHandler extends Handler {
       ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(chunkInfoProto);
       Preconditions.checkNotNull(chunkInfo);
 
-      byte[] data = null;
+      ByteBuffer data = null;
       if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
           request.getWriteChunk().getStage() == Stage.COMBINED) {
-        data = request.getWriteChunk().getData().toByteArray();
+        data = request.getWriteChunk().getData().asReadOnlyByteBuffer();
       }
 
       chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
@@ -713,7 +714,7 @@ public class KeyValueHandler extends Handler {
       ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
           putSmallFileReq.getChunkInfo());
       Preconditions.checkNotNull(chunkInfo);
-      byte[] data = putSmallFileReq.getData().toByteArray();
+      ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer();
       // chunks will be committed as a part of handling putSmallFile
       // here. There is no need to maintain this info in openContainerBlockMap.
       chunkManager.writeChunk(
@@ -724,7 +725,7 @@ public class KeyValueHandler extends Handler {
       blockData.setChunks(chunks);
       // TODO: add bcsId as a part of putSmallFile transaction
       blockManager.putBlock(kvContainer, blockData);
-      metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
+      metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity());
 
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/942693bd/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index 20598d9..718f5de 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -73,15 +73,15 @@ public final class ChunkUtils {
    * @throws StorageContainerException
    */
   public static void writeData(File chunkFile, ChunkInfo chunkInfo,
-                               byte[] data, VolumeIOStats volumeIOStats) throws
-      StorageContainerException, ExecutionException, InterruptedException,
-      NoSuchAlgorithmException {
-
+                               ByteBuffer data, VolumeIOStats volumeIOStats)
+      throws StorageContainerException, ExecutionException,
+      InterruptedException, NoSuchAlgorithmException {
+    int bufferSize = data.capacity();
     Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-    if (data.length != chunkInfo.getLen()) {
+    if (bufferSize != chunkInfo.getLen()) {
       String err = String.format("data array does not match the length " +
               "specified. DataLen: %d Byte Array: %d",
-          chunkInfo.getLen(), data.length);
+          chunkInfo.getLen(), bufferSize);
       log.error(err);
       throw new StorageContainerException(err, INVALID_WRITE_SIZE);
     }
@@ -103,16 +103,16 @@ public final class ChunkUtils {
               StandardOpenOption.SPARSE,
               StandardOpenOption.SYNC);
       lock = file.lock().get();
-      int size = file.write(ByteBuffer.wrap(data), 
chunkInfo.getOffset()).get();
+      int size = file.write(data, chunkInfo.getOffset()).get();
       // Increment volumeIO stats here.
       volumeIOStats.incWriteTime(Time.monotonicNow() - writeTimeStart);
       volumeIOStats.incWriteOpCount();
       volumeIOStats.incWriteBytes(size);
-      if (size != data.length) {
+      if (size != bufferSize) {
         log.error("Invalid write size found. Size:{}  Expected: {} ", size,
-            data.length);
+            bufferSize);
         throw new StorageContainerException("Invalid write size found. " +
-            "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
+            "Size: " + size + " Expected: " + bufferSize, INVALID_WRITE_SIZE);
       }
     } catch (StorageContainerException ex) {
       throw ex;
@@ -183,7 +183,8 @@ public final class ChunkUtils {
       volumeIOStats.incReadOpCount();
       volumeIOStats.incReadBytes(data.getLen());
       if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
-        verifyChecksum(data, buf.array(), log);
+        buf.rewind();
+        verifyChecksum(data, buf, log);
       }
       return buf;
     } catch (IOException e) {
@@ -211,10 +212,11 @@ public final class ChunkUtils {
    * @throws NoSuchAlgorithmException
    * @throws StorageContainerException
    */
-  private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
-      log) throws NoSuchAlgorithmException, StorageContainerException {
+  private static void verifyChecksum(ChunkInfo chunkInfo, ByteBuffer data,
+      Logger log) throws NoSuchAlgorithmException, StorageContainerException {
     MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
     sha.update(data);
+    data.rewind();
     if (!Hex.encodeHexString(sha.digest()).equals(
         chunkInfo.getChecksum())) {
       log.error("Checksum mismatch. Provided: {} , computed: {}",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/942693bd/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
index 6fd8d5f..c630e19 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
@@ -66,7 +66,7 @@ public class ChunkManagerImpl implements ChunkManager {
    * @throws StorageContainerException
    */
   public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-      byte[] data, ContainerProtos.Stage stage)
+      ByteBuffer data, ContainerProtos.Stage stage)
       throws StorageContainerException {
 
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/942693bd/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 7134be1..3b06585 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import java.nio.ByteBuffer;
 
 /**
  * Chunk Manager allows read, write, delete and listing of chunks in
@@ -41,7 +42,7 @@ public interface ChunkManager {
    * @throws StorageContainerException
    */
   void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-                  byte[] data, ContainerProtos.Stage stage)
+                  ByteBuffer data, ContainerProtos.Stage stage)
       throws StorageContainerException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/942693bd/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
index 3c0876b..9e3edf7 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
@@ -39,6 +39,7 @@ import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
 import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.UUID;
 
@@ -109,8 +110,8 @@ public class TestChunkManagerImpl {
 
     // As no chunks are written to the volume writeBytes should be 0
     checkWriteIOStats(0, 0);
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        ContainerProtos.Stage.WRITE_DATA);
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+        ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA);
     // Now a chunk file is being written with Stage WRITE_DATA, so it should
     // create a temporary chunk file.
     assertTrue(chunksPath.listFiles().length == 1);
@@ -126,8 +127,8 @@ public class TestChunkManagerImpl {
 
     checkWriteIOStats(data.length, 1);
 
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        ContainerProtos.Stage.COMMIT_DATA);
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+        ByteBuffer.wrap(data), ContainerProtos.Stage.COMMIT_DATA);
 
     checkWriteIOStats(data.length, 1);
 
@@ -146,8 +147,8 @@ public class TestChunkManagerImpl {
       long randomLength = 200L;
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), 0), 0, randomLength);
-      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-          ContainerProtos.Stage.WRITE_DATA);
+      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+          ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA);
       fail("testWriteChunkIncorrectLength failed");
     } catch (StorageContainerException ex) {
       // As we got an exception, writeBytes should be 0.
@@ -167,8 +168,8 @@ public class TestChunkManagerImpl {
     // Initially chunks folder should be empty.
     assertTrue(chunksPath.listFiles().length == 0);
     checkWriteIOStats(0, 0);
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        ContainerProtos.Stage.COMBINED);
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+        ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
     // Now a chunk file is being written with Stage COMBINED_DATA, so it should
     // create a chunk file.
     assertTrue(chunksPath.listFiles().length == 1);
@@ -180,8 +181,8 @@ public class TestChunkManagerImpl {
   @Test
   public void testReadChunk() throws Exception {
     checkWriteIOStats(0, 0);
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        ContainerProtos.Stage.COMBINED);
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+        ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
     checkWriteIOStats(data.length, 1);
     checkReadIOStats(0, 0);
     byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
@@ -194,8 +195,8 @@ public class TestChunkManagerImpl {
   @Test
   public void testDeleteChunk() throws Exception {
     File chunksPath = new File(keyValueContainerData.getChunksPath());
-    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-        ContainerProtos.Stage.COMBINED);
+    chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+        ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
     assertTrue(chunksPath.listFiles().length == 1);
     chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
     assertTrue(chunksPath.listFiles().length == 0);
@@ -204,8 +205,8 @@ public class TestChunkManagerImpl {
   @Test
   public void testDeleteChunkUnsupportedRequest() throws Exception {
     try {
-      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-          ContainerProtos.Stage.COMBINED);
+      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+          ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
       long randomLength = 200L;
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), 0), 0, randomLength);
@@ -224,8 +225,8 @@ public class TestChunkManagerImpl {
           .getLocalID(), 0), 0, data.length);
       //Setting checksum to some value.
       chunkInfo.setChecksum("some garbage");
-      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-          ContainerProtos.Stage.COMBINED);
+      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+          ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
       fail("testWriteChunkChecksumMismatch failed");
     } catch (StorageContainerException ex) {
       GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex);
@@ -252,8 +253,8 @@ public class TestChunkManagerImpl {
     for (int i=0; i<100; i++) {
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), i), 0, data.length);
-      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
-          ContainerProtos.Stage.COMBINED);
+      chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
+          ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED);
     }
     checkWriteIOStats(data.length*100, 100);
     assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/942693bd/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index c2941ed..f6c498a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -59,6 +59,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -330,7 +331,8 @@ public class TestContainerPersistence {
         blockID.getLocalID(), 0, 0, datalen);
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
-    chunkManager.writeChunk(container, blockID, info, data, COMBINED);
+    chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+        COMBINED);
     return info;
 
   }
@@ -371,7 +373,8 @@ public class TestContainerPersistence {
       ChunkInfo info = getChunk(blockID.getLocalID(), x, 0, datalen);
       byte[] data = getData(datalen);
       setDataChecksum(info, data);
-      chunkManager.writeChunk(container, blockID, info, data, COMBINED);
+      chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+          COMBINED);
       String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
       fileHashMap.put(fileName, info);
     }
@@ -431,7 +434,8 @@ public class TestContainerPersistence {
         blockID.getLocalID(), 0, 0, datalen);
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
-    chunkManager.writeChunk(container, blockID, info, data, COMBINED);
+    chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+        COMBINED);
 
     byte[] readData = chunkManager.readChunk(container, blockID, info);
     assertTrue(Arrays.equals(data, readData));
@@ -463,11 +467,14 @@ public class TestContainerPersistence {
         blockID.getLocalID(), 0, 0, datalen);
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
-    chunkManager.writeChunk(container, blockID, info, data, COMBINED);
-    chunkManager.writeChunk(container, blockID, info, data, COMBINED);
+    chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+        COMBINED);
+    chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+        COMBINED);
     // With the overwrite flag it should work now.
     info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
-    chunkManager.writeChunk(container, blockID, info, data, COMBINED);
+    chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+        COMBINED);
     long bytesUsed = container.getContainerData().getBytesUsed();
     Assert.assertEquals(datalen, bytesUsed);
 
@@ -501,7 +508,8 @@ public class TestContainerPersistence {
       byte[] data = getData(datalen);
       oldSha.update(data);
       setDataChecksum(info, data);
-      chunkManager.writeChunk(container, blockID, info, data, COMBINED);
+      chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+          COMBINED);
     }
 
     // Request to read the whole data in a single go.
@@ -532,7 +540,8 @@ public class TestContainerPersistence {
         blockID.getLocalID(), 0, 0, datalen);
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
-    chunkManager.writeChunk(container, blockID, info, data, COMBINED);
+    chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+        COMBINED);
     chunkManager.deleteChunk(container, blockID, info);
     exception.expect(StorageContainerException.class);
     exception.expectMessage("Unable to find the chunk file.");
@@ -646,7 +655,8 @@ public class TestContainerPersistence {
       info = getChunk(blockID.getLocalID(), x, x * datalen, datalen);
       byte[] data = getData(datalen);
       setDataChecksum(info, data);
-      chunkManager.writeChunk(container, blockID, info, data, COMBINED);
+      chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
+          COMBINED);
       totalSize += datalen;
       chunkList.add(info);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to