HDDS-890. Handle OverlappingFileLockException during writeStateMachineData in 
ContainerStateMachine. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7274115d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7274115d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7274115d

Branch: refs/heads/HDFS-12943
Commit: 7274115d57fdfef48fca1afa7be7ed2634dd31fa
Parents: ff31313
Author: Mukul Kumar Singh <msi...@apache.org>
Authored: Tue Dec 4 20:12:35 2018 +0530
Committer: Mukul Kumar Singh <msi...@apache.org>
Committed: Tue Dec 4 20:12:35 2018 +0530

----------------------------------------------------------------------
 .../container/keyvalue/KeyValueHandler.java     | 36 ++++++++++--------
 .../keyvalue/impl/ChunkManagerImpl.java         | 33 +++++++++-------
 .../keyvalue/interfaces/ChunkManager.java       |  8 ++--
 .../keyvalue/TestChunkManagerImpl.java          | 40 ++++++++++++--------
 .../common/impl/TestContainerPersistence.java   | 38 +++++++++++--------
 5 files changed, 93 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7274115d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 5130253..01964ba 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -545,10 +545,12 @@ public class KeyValueHandler extends Handler {
           .getChunkData());
       Preconditions.checkNotNull(chunkInfo);
 
-      boolean isReadFromTmpFile = dispatcherContext == null ? false :
-          dispatcherContext.isReadFromTmpFile();
+      if (dispatcherContext == null) {
+        dispatcherContext = new DispatcherContext.Builder().build();
+      }
+
       data = chunkManager
-          .readChunk(kvContainer, blockID, chunkInfo, isReadFromTmpFile);
+          .readChunk(kvContainer, blockID, chunkInfo, dispatcherContext);
       metrics.incContainerBytesStats(Type.ReadChunk, data.length);
     } catch (StorageContainerException ex) {
       return ContainerUtils.logAndReturnError(LOG, ex, request);
@@ -619,15 +621,17 @@ public class KeyValueHandler extends Handler {
       Preconditions.checkNotNull(chunkInfo);
 
       ByteBuffer data = null;
-      WriteChunkStage stage =
-          dispatcherContext == null ? WriteChunkStage.COMBINED :
-              dispatcherContext.getStage();
+      if (dispatcherContext == null) {
+        dispatcherContext = new DispatcherContext.Builder().build();
+      }
+      WriteChunkStage stage = dispatcherContext.getStage();
       if (stage == WriteChunkStage.WRITE_DATA ||
           stage == WriteChunkStage.COMBINED) {
         data = request.getWriteChunk().getData().asReadOnlyByteBuffer();
       }
 
-      chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage);
+      chunkManager
+          .writeChunk(kvContainer, blockID, chunkInfo, data, 
dispatcherContext);
 
       // We should increment stats after writeChunk
       if (stage == WriteChunkStage.WRITE_DATA||
@@ -677,19 +681,19 @@ public class KeyValueHandler extends Handler {
           putSmallFileReq.getChunkInfo());
       Preconditions.checkNotNull(chunkInfo);
       ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer();
-      WriteChunkStage stage =
-          dispatcherContext == null ? WriteChunkStage.COMBINED :
-              dispatcherContext.getStage();
+      if (dispatcherContext == null) {
+        dispatcherContext = new DispatcherContext.Builder().build();
+      }
+
       // chunks will be committed as a part of handling putSmallFile
       // here. There is no need to maintain this info in openContainerBlockMap.
-      chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage);
+      chunkManager
+          .writeChunk(kvContainer, blockID, chunkInfo, data, 
dispatcherContext);
 
       List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
       chunks.add(chunkInfo.getProtoBufMessage());
       blockData.setChunks(chunks);
-      long bcsId =
-          dispatcherContext == null ? 0 : dispatcherContext.getLogIndex();
-      blockData.setBlockCommitSequenceId(bcsId);
+      blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
 
       blockManager.putBlock(kvContainer, blockData);
       metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity());
@@ -728,11 +732,13 @@ public class KeyValueHandler extends Handler {
 
       ContainerProtos.ChunkInfo chunkInfo = null;
       ByteString dataBuf = ByteString.EMPTY;
+      DispatcherContext dispatcherContext =
+          new DispatcherContext.Builder().build();
       for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
         // if the block is committed, all chunks must have been committed.
         // Tmp chunk files won't exist here.
         byte[] data = chunkManager.readChunk(kvContainer, blockID,
-            ChunkInfo.getFromProtoBuf(chunk), false);
+            ChunkInfo.getFromProtoBuf(chunk), dispatcherContext);
         ByteString current = ByteString.copyFrom(data);
         dataBuf = dataBuf.concat(current);
         chunkInfo = chunk;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7274115d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
index a2e8e5c..e4814cb 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java
@@ -67,13 +67,14 @@ public class ChunkManagerImpl implements ChunkManager {
    * @param blockID - ID of the block
    * @param info - ChunkInfo
    * @param data - data of the chunk
-   * @param stage - Stage of the Chunk operation
+   * @param dispatcherContext - dispatcherContextInfo
    * @throws StorageContainerException
    */
   public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-      ByteBuffer data, DispatcherContext.WriteChunkStage stage)
+      ByteBuffer data, DispatcherContext dispatcherContext)
       throws StorageContainerException {
-
+    Preconditions.checkNotNull(dispatcherContext);
+    DispatcherContext.WriteChunkStage stage = dispatcherContext.getStage();
     try {
 
       KeyValueContainerData containerData = (KeyValueContainerData) container
@@ -85,7 +86,7 @@ public class ChunkManagerImpl implements ChunkManager {
 
       boolean isOverwrite = ChunkUtils.validateChunkForOverwrite(
           chunkFile, info);
-      File tmpChunkFile = getTmpChunkFile(chunkFile, info);
+      File tmpChunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
 
       LOG.debug(
           "writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file:{}",
@@ -137,6 +138,8 @@ public class ChunkManagerImpl implements ChunkManager {
           LOG.warn("ChunkFile already exists" + chunkFile);
           return;
         }
+        // While committing a chunk , just rename the tmp chunk file which has
+        // the same term and log index appended as the current transaction
         commitChunk(tmpChunkFile, chunkFile);
         // Increment container stats here, as we commit the data.
         containerData.incrBytesUsed(info.getLen());
@@ -179,14 +182,14 @@ public class ChunkManagerImpl implements ChunkManager {
    * @param container - Container for the chunk
    * @param blockID - ID of the block.
    * @param info - ChunkInfo.
-   * @param readFromTmpFile whether to read from tmp chunk file or not.
+   * @param dispatcherContext dispatcher context info.
    * @return byte array
    * @throws StorageContainerException
    * TODO: Right now we do not support partial reads and writes of chunks.
    * TODO: Explore if we need to do that for ozone.
    */
   public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
-      boolean readFromTmpFile) throws StorageContainerException {
+      DispatcherContext dispatcherContext) throws StorageContainerException {
     try {
       KeyValueContainerData containerData = (KeyValueContainerData) container
           .getContainerData();
@@ -204,8 +207,8 @@ public class ChunkManagerImpl implements ChunkManager {
 
         // In case the chunk file does not exist but tmp chunk file exist,
         // read from tmp chunk file if readFromTmpFile is set to true
-        if (!chunkFile.exists() && readFromTmpFile) {
-          chunkFile = getTmpChunkFile(chunkFile, info);
+        if (!chunkFile.exists() && dispatcherContext.isReadFromTmpFile()) {
+          chunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
         }
         data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
         containerData.incrReadCount();
@@ -279,17 +282,21 @@ public class ChunkManagerImpl implements ChunkManager {
 
   /**
    * Returns the temporary chunkFile path.
-   * @param chunkFile
-   * @param info
+   * @param chunkFile chunkFileName
+   * @param dispatcherContext dispatcher context info
    * @return temporary chunkFile path
    * @throws StorageContainerException
    */
-  private File getTmpChunkFile(File chunkFile, ChunkInfo info)
-      throws StorageContainerException {
+  private File getTmpChunkFile(File chunkFile,
+      DispatcherContext dispatcherContext)  {
     return new File(chunkFile.getParent(),
         chunkFile.getName() +
             OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
-            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
+            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX +
+            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
+            dispatcherContext.getTerm() +
+            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
+            dispatcherContext.getLogIndex());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7274115d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 4282e46..5a6898f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -39,11 +39,11 @@ public interface ChunkManager {
    * @param container - Container for the chunk
    * @param blockID - ID of the block.
    * @param info - ChunkInfo.
-   * @param stage - Chunk Stage write.
+   * @param dispatcherContext - dispatcher context info.
    * @throws StorageContainerException
    */
   void writeChunk(Container container, BlockID blockID, ChunkInfo info,
-      ByteBuffer data, DispatcherContext.WriteChunkStage stage)
+      ByteBuffer data, DispatcherContext dispatcherContext)
       throws StorageContainerException;
 
   /**
@@ -52,7 +52,7 @@ public interface ChunkManager {
    * @param container - Container for the chunk
    * @param blockID - ID of the block.
    * @param info - ChunkInfo.
-   * @param readFromTmpFile whether to read from tmp chunk file or not
+   * @param dispatcherContext - dispatcher context info.
    * @return  byte array
    * @throws StorageContainerException
    *
@@ -60,7 +60,7 @@ public interface ChunkManager {
    * TODO: Explore if we need to do that for ozone.
    */
   byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
-      boolean readFromTmpFile) throws StorageContainerException;
+      DispatcherContext dispatcherContext) throws StorageContainerException;
 
   /**
    * Deletes a given chunk.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7274115d/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
index fd48bf5..cf9ea89 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
@@ -25,7 +25,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
@@ -103,6 +103,10 @@ public class TestChunkManagerImpl {
 
   }
 
+  private DispatcherContext getDispatcherContext() {
+    return new DispatcherContext.Builder().build();
+  }
+
   @Test
   public void testWriteChunkStageWriteAndCommit() throws Exception {
     //As in Setup, we try to create container, these paths should exist.
@@ -115,16 +119,20 @@ public class TestChunkManagerImpl {
     // As no chunks are written to the volume writeBytes should be 0
     checkWriteIOStats(0, 0);
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA);
+        ByteBuffer.wrap(data), new DispatcherContext.Builder()
+            .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build());
     // Now a chunk file is being written with Stage WRITE_DATA, so it should
     // create a temporary chunk file.
     assertTrue(chunksPath.listFiles().length == 1);
 
+    long term = 0;
+    long index = 0;
     File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo);
     File tempChunkFile = new File(chunkFile.getParent(),
-        chunkFile.getName() +
-            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
-            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
+        chunkFile.getName() + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER
+            + OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX
+            + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + term
+            + OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER + index);
 
     // As chunk write stage is WRITE_DATA, temp chunk file will be created.
     assertTrue(tempChunkFile.exists());
@@ -132,7 +140,8 @@ public class TestChunkManagerImpl {
     checkWriteIOStats(data.length, 1);
 
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), WriteChunkStage.COMMIT_DATA);
+        ByteBuffer.wrap(data), new DispatcherContext.Builder()
+            .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build());
 
     checkWriteIOStats(data.length, 1);
 
@@ -152,7 +161,7 @@ public class TestChunkManagerImpl {
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), 0), 0, randomLength);
       chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-          ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA);
+          ByteBuffer.wrap(data), getDispatcherContext());
       fail("testWriteChunkIncorrectLength failed");
     } catch (StorageContainerException ex) {
       // As we got an exception, writeBytes should be 0.
@@ -173,7 +182,7 @@ public class TestChunkManagerImpl {
     assertTrue(chunksPath.listFiles().length == 0);
     checkWriteIOStats(0, 0);
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
+        ByteBuffer.wrap(data), getDispatcherContext());
     // Now a chunk file is being written with Stage COMBINED_DATA, so it should
     // create a chunk file.
     assertTrue(chunksPath.listFiles().length == 1);
@@ -186,11 +195,11 @@ public class TestChunkManagerImpl {
   public void testReadChunk() throws Exception {
     checkWriteIOStats(0, 0);
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
+        ByteBuffer.wrap(data), getDispatcherContext());
     checkWriteIOStats(data.length, 1);
     checkReadIOStats(0, 0);
     byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
-        chunkInfo, false);
+        chunkInfo, getDispatcherContext());
     assertEquals(expectedData.length, data.length);
     assertTrue(Arrays.equals(expectedData, data));
     checkReadIOStats(data.length, 1);
@@ -200,7 +209,7 @@ public class TestChunkManagerImpl {
   public void testDeleteChunk() throws Exception {
     File chunksPath = new File(keyValueContainerData.getChunksPath());
     chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-        ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
+        ByteBuffer.wrap(data), getDispatcherContext());
     assertTrue(chunksPath.listFiles().length == 1);
     chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
     assertTrue(chunksPath.listFiles().length == 0);
@@ -210,7 +219,7 @@ public class TestChunkManagerImpl {
   public void testDeleteChunkUnsupportedRequest() throws Exception {
     try {
       chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-          ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
+          ByteBuffer.wrap(data), getDispatcherContext());
       long randomLength = 200L;
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), 0), 0, randomLength);
@@ -227,7 +236,7 @@ public class TestChunkManagerImpl {
     try {
       // trying to read a chunk, where chunk file does not exist
       byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
-          chunkInfo, false);
+          chunkInfo, getDispatcherContext());
       fail("testReadChunkFileNotExists failed");
     } catch (StorageContainerException ex) {
       GenericTestUtils.assertExceptionContains("Unable to find the chunk " +
@@ -242,7 +251,7 @@ public class TestChunkManagerImpl {
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), i), 0, data.length);
       chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
-          ByteBuffer.wrap(data), WriteChunkStage.COMBINED);
+          ByteBuffer.wrap(data), getDispatcherContext());
     }
     checkWriteIOStats(data.length*100, 100);
     assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);
@@ -250,7 +259,8 @@ public class TestChunkManagerImpl {
     for (int i=0; i<100; i++) {
       chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
           .getLocalID(), i), 0, data.length);
-      chunkManager.readChunk(keyValueContainer, blockID, chunkInfo, false);
+      chunkManager.readChunk(keyValueContainer, blockID, chunkInfo,
+          getDispatcherContext());
     }
     checkReadIOStats(data.length*100, 100);
     assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7274115d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 32b01ae..d89ffb6 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
@@ -78,7 +79,6 @@ import java.util.UUID;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
-import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
 import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
@@ -156,6 +156,10 @@ public class TestContainerPersistence {
     return ContainerTestHelper.getTestContainerID();
   }
 
+  private DispatcherContext getDispatcherContext() {
+    return new DispatcherContext.Builder().build();
+  }
+
   private Container addContainer(ContainerSet cSet, long cID)
       throws IOException {
     KeyValueContainerData data = new KeyValueContainerData(cID,
@@ -334,7 +338,7 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        WriteChunkStage.COMBINED);
+        getDispatcherContext());
     return info;
 
   }
@@ -375,7 +379,7 @@ public class TestContainerPersistence {
       byte[] data = getData(datalen);
       setDataChecksum(info, data);
       chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-          WriteChunkStage.COMBINED);
+          getDispatcherContext());
       String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
       fileHashMap.put(fileName, info);
     }
@@ -406,7 +410,8 @@ public class TestContainerPersistence {
       for (int x = 0; x < chunkCount; x++) {
         String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
         ChunkInfo info = fileHashMap.get(fileName);
-        byte[] data = chunkManager.readChunk(container, blockID, info, false);
+        byte[] data = chunkManager
+            .readChunk(container, blockID, info, getDispatcherContext());
         ChecksumData checksumData = checksum.computeChecksum(data);
         Assert.assertEquals(info.getChecksumData(), checksumData);
       }
@@ -433,13 +438,15 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        WriteChunkStage.COMBINED);
+        getDispatcherContext());
 
-    byte[] readData = chunkManager.readChunk(container, blockID, info, false);
+    byte[] readData = chunkManager
+        .readChunk(container, blockID, info, getDispatcherContext());
     assertTrue(Arrays.equals(data, readData));
 
     ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length);
-    byte[] readData2 = chunkManager.readChunk(container, blockID, info2, 
false);
+    byte[] readData2 = chunkManager
+        .readChunk(container, blockID, info2, getDispatcherContext());
     assertEquals(length, readData2.length);
     assertTrue(Arrays.equals(
         Arrays.copyOfRange(data, start, start + length), readData2));
@@ -466,13 +473,13 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        WriteChunkStage.COMBINED);
+        getDispatcherContext());
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        WriteChunkStage.COMBINED);
+        getDispatcherContext());
     // With the overwrite flag it should work now.
     info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        WriteChunkStage.COMBINED);
+        getDispatcherContext());
     long bytesUsed = container.getContainerData().getBytesUsed();
     Assert.assertEquals(datalen, bytesUsed);
 
@@ -507,14 +514,15 @@ public class TestContainerPersistence {
       oldSha.update(data);
       setDataChecksum(info, data);
       chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-          WriteChunkStage.COMBINED);
+          getDispatcherContext());
     }
 
     // Request to read the whole data in a single go.
     ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0,
         datalen * chunkCount);
     byte[] newdata =
-        chunkManager.readChunk(container, blockID, largeChunk, false);
+        chunkManager.readChunk(container, blockID, largeChunk,
+            getDispatcherContext());
     MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
     newSha.update(newdata);
     Assert.assertEquals(Hex.encodeHexString(oldSha.digest()),
@@ -540,11 +548,11 @@ public class TestContainerPersistence {
     byte[] data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-        WriteChunkStage.COMBINED);
+        getDispatcherContext());
     chunkManager.deleteChunk(container, blockID, info);
     exception.expect(StorageContainerException.class);
     exception.expectMessage("Unable to find the chunk file.");
-    chunkManager.readChunk(container, blockID, info, false);
+    chunkManager.readChunk(container, blockID, info, getDispatcherContext());
   }
 
   /**
@@ -655,7 +663,7 @@ public class TestContainerPersistence {
       byte[] data = getData(datalen);
       setDataChecksum(info, data);
       chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
-          WriteChunkStage.COMBINED);
+          getDispatcherContext());
       totalSize += datalen;
       chunkList.add(info);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to