This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 60bb060421 HDDS-9734. ChunkInputStream should use new token after 
pipeline refresh (#5664)
60bb060421 is described below

commit 60bb0604217fbd44dbc6b115b59fc26a297fcaba
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Nov 23 21:38:01 2023 +0100

    HDDS-9734. ChunkInputStream should use new token after pipeline refresh 
(#5664)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 112 ++++++++++--------
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  | 131 ++++++++++-----------
 .../hdds/scm/storage/DummyBlockInputStream.java    |   2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    |   4 +-
 .../hdds/scm/storage/DummyChunkInputStream.java    |   2 +-
 .../hdds/scm/storage/TestBlockInputStream.java     |   6 +-
 .../hdds/scm/storage/TestChunkInputStream.java     |  62 +++++++---
 7 files changed, 183 insertions(+), 136 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index c10e271f2e..385ea6d0c3 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -25,11 +25,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
@@ -52,6 +52,8 @@ import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.hdds.client.ReplicationConfig.getLegacyFactor;
+
 /**
  * An {@link InputStream} called from KeyInputStream to read a block from the
  * container.
@@ -65,8 +67,10 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
 
   private final BlockID blockID;
   private final long length;
-  private Pipeline pipeline;
-  private Token<OzoneBlockTokenIdentifier> token;
+  private final AtomicReference<Pipeline> pipelineRef =
+      new AtomicReference<>();
+  private final AtomicReference<Token<OzoneBlockTokenIdentifier>> tokenRef =
+      new AtomicReference<>();
   private final boolean verifyChecksum;
   private XceiverClientFactory xceiverClientFactory;
   private XceiverClientSpi xceiverClient;
@@ -113,8 +117,8 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
       Function<BlockID, BlockLocationInfo> refreshFunction) {
     this.blockID = blockId;
     this.length = blockLen;
-    this.pipeline = pipeline;
-    this.token = token;
+    setPipeline(pipeline);
+    tokenRef.set(token);
     this.verifyChecksum = verifyChecksum;
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshFunction = refreshFunction;
@@ -143,7 +147,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
     IOException catchEx = null;
     do {
       try {
-        chunks = getChunkInfos();
+        chunks = getChunkInfoList();
         break;
         // If we get a StorageContainerException or an IOException due to
         // datanodes are not reachable, refresh to get the latest pipeline
@@ -203,7 +207,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
 
   private void refreshBlockInfo(IOException cause) throws IOException {
     LOG.info("Unable to read information for block {} from pipeline {}: {}",
-        blockID, pipeline.getId(), cause.getMessage());
+        blockID, pipelineRef.get().getId(), cause.getMessage());
     if (refreshFunction != null) {
       LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
       BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
@@ -212,8 +216,8 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
       } else {
         LOG.debug("New pipeline for block {}: {}", blockID,
             blockLocationInfo.getPipeline());
-        this.pipeline = blockLocationInfo.getPipeline();
-        this.token = blockLocationInfo.getToken();
+        setPipeline(blockLocationInfo.getPipeline());
+        tokenRef.set(blockLocationInfo.getToken());
       }
     } else {
       throw cause;
@@ -224,46 +228,55 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
    * Send RPC call to get the block info from the container.
    * @return List of chunks in this block.
    */
-  protected List<ChunkInfo> getChunkInfos() throws IOException {
-    // irrespective of the container state, we will always read via Standalone
-    // protocol.
-    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE && 
pipeline
-        .getType() != HddsProtos.ReplicationType.EC) {
-      pipeline = Pipeline.newBuilder(pipeline)
-          .setReplicationConfig(StandaloneReplicationConfig.getInstance(
-              ReplicationConfig
-                  .getLegacyFactor(pipeline.getReplicationConfig())))
-          .build();
-    }
+  protected List<ChunkInfo> getChunkInfoList() throws IOException {
+    acquireClient();
     try {
-      acquireClient();
-    } catch (IOException ioe) {
-      LOG.warn("Failed to acquire client for pipeline {}, block {}",
-          pipeline, blockID);
-      throw ioe;
+      return getChunkInfoListUsingClient();
+    } finally {
+      releaseClient();
     }
-    try {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Initializing BlockInputStream for get key to access {}",
-            blockID.getContainerID());
-      }
+  }
 
-      DatanodeBlockID.Builder blkIDBuilder =
-          DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
-              .setLocalID(blockID.getLocalID())
-              .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());
+  @VisibleForTesting
+  protected List<ChunkInfo> getChunkInfoListUsingClient() throws IOException {
+    final Pipeline pipeline = xceiverClient.getPipeline();
 
-      int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
-      if (replicaIndex > 0) {
-        blkIDBuilder.setReplicaIndex(replicaIndex);
-      }
-      GetBlockResponseProto response = ContainerProtocolCalls
-          .getBlock(xceiverClient, VALIDATORS, blkIDBuilder.build(), token);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initializing BlockInputStream for get key to access {}",
+          blockID.getContainerID());
+    }
 
-      return response.getBlockData().getChunksList();
-    } finally {
-      releaseClient();
+    DatanodeBlockID.Builder blkIDBuilder =
+        DatanodeBlockID.newBuilder().setContainerID(blockID.getContainerID())
+            .setLocalID(blockID.getLocalID())
+            .setBlockCommitSequenceId(blockID.getBlockCommitSequenceId());
+
+    int replicaIndex = pipeline.getReplicaIndex(pipeline.getClosestNode());
+    if (replicaIndex > 0) {
+      blkIDBuilder.setReplicaIndex(replicaIndex);
     }
+
+    GetBlockResponseProto response = ContainerProtocolCalls.getBlock(
+        xceiverClient, VALIDATORS, blkIDBuilder.build(), tokenRef.get());
+
+    return response.getBlockData().getChunksList();
+  }
+
+  private void setPipeline(Pipeline pipeline) {
+    if (pipeline == null) {
+      return;
+    }
+
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    boolean okForRead =
+        pipeline.getType() == HddsProtos.ReplicationType.STAND_ALONE
+            || pipeline.getType() == HddsProtos.ReplicationType.EC;
+    Pipeline readPipeline = okForRead ? pipeline : 
Pipeline.newBuilder(pipeline)
+        .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+            getLegacyFactor(pipeline.getReplicationConfig())))
+        .build();
+    pipelineRef.set(readPipeline);
   }
 
   private static final List<Validator> VALIDATORS
@@ -286,9 +299,16 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
     }
   }
 
-  protected void acquireClient() throws IOException {
+  private void acquireClient() throws IOException {
     if (xceiverClientFactory != null && xceiverClient == null) {
-      xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
+      final Pipeline pipeline = pipelineRef.get();
+      try {
+        xceiverClient = 
xceiverClientFactory.acquireClientForReadData(pipeline);
+      } catch (IOException ioe) {
+        LOG.warn("Failed to acquire client for pipeline {}, block {}",
+            pipeline, blockID);
+        throw ioe;
+      }
     }
   }
 
@@ -303,7 +323,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
 
   protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
     return new ChunkInputStream(chunkInfo, blockID,
-        xceiverClientFactory, () -> pipeline, verifyChecksum, token);
+        xceiverClientFactory, pipelineRef::get, verifyChecksum, tokenRef::get);
   }
 
   @Override
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index fb9e345586..b30f555795 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.client.BlockID;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadChunkResponseProto;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -37,7 +39,6 @@ import org.apache.hadoop.ozone.common.ChecksumData;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 
 import java.io.EOFException;
@@ -48,9 +49,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.function.Supplier;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * An {@link InputStream} called from BlockInputStream to read a chunk from the
  * container. Each chunk may contain multiple underlying {@link ByteBuffer}
@@ -59,16 +57,13 @@ import org.slf4j.LoggerFactory;
 public class ChunkInputStream extends InputStream
     implements Seekable, CanUnbuffer, ByteBufferReadable {
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ChunkInputStream.class);
-
-  private ChunkInfo chunkInfo;
+  private final ChunkInfo chunkInfo;
   private final long length;
   private final BlockID blockID;
   private final XceiverClientFactory xceiverClientFactory;
   private XceiverClientSpi xceiverClient;
   private final Supplier<Pipeline> pipelineSupplier;
-  private boolean verifyChecksum;
+  private final boolean verifyChecksum;
   private boolean allocated = false;
   // Buffers to store the chunk data read from the DN container
   private ByteBuffer[] buffers;
@@ -100,21 +95,24 @@ public class ChunkInputStream extends InputStream
   // retry.  Once the chunk is read, this variable is reset.
   private long chunkPosition = -1;
 
-  private final Token<? extends TokenIdentifier> token;
+  private final Supplier<Token<?>> tokenSupplier;
 
   private static final int EOF = -1;
+  private final List<Validator> validators;
 
   ChunkInputStream(ChunkInfo chunkInfo, BlockID blockId,
       XceiverClientFactory xceiverClientFactory,
       Supplier<Pipeline> pipelineSupplier,
-      boolean verifyChecksum, Token<? extends TokenIdentifier> token) {
+      boolean verifyChecksum,
+      Supplier<Token<?>> tokenSupplier) {
     this.chunkInfo = chunkInfo;
     this.length = chunkInfo.getLen();
     this.blockID = blockId;
     this.xceiverClientFactory = xceiverClientFactory;
     this.pipelineSupplier = pipelineSupplier;
     this.verifyChecksum = verifyChecksum;
-    this.token = token;
+    this.tokenSupplier = tokenSupplier;
+    validators = ContainerProtocolCalls.toValidatorList(this::validateChunk);
   }
 
   public synchronized long getRemaining() {
@@ -422,13 +420,10 @@ public class ChunkInputStream extends InputStream
   @VisibleForTesting
   protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo)
       throws IOException {
-    ReadChunkResponseProto readChunkResponse;
 
-    List<Validator> validators =
-        ContainerProtocolCalls.toValidatorList(validator);
-
-    readChunkResponse = ContainerProtocolCalls.readChunk(xceiverClient,
-        readChunkInfo, blockID, validators, token);
+    ReadChunkResponseProto readChunkResponse =
+        ContainerProtocolCalls.readChunk(xceiverClient,
+            readChunkInfo, blockID, validators, tokenSupplier.get());
 
     if (readChunkResponse.hasData()) {
       return readChunkResponse.getData().asReadOnlyByteBufferList()
@@ -443,55 +438,57 @@ public class ChunkInputStream extends InputStream
     }
   }
 
-  private final Validator validator =
-          (request, response) -> {
-            final ChunkInfo reqChunkInfo =
-                request.getReadChunk().getChunkData();
-
-            ReadChunkResponseProto readChunkResponse = response.getReadChunk();
-            List<ByteString> byteStrings;
-            boolean isV0 = false;
-
-            if (readChunkResponse.hasData()) {
-              ByteString byteString = readChunkResponse.getData();
-              if (byteString.size() != reqChunkInfo.getLen()) {
-                // Bytes read from chunk should be equal to chunk size.
-                throw new OzoneChecksumException(String.format(
-                    "Inconsistent read for chunk=%s len=%d bytesRead=%d",
-                    reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
-                    byteString.size()));
-              }
-              byteStrings = new ArrayList<>();
-              byteStrings.add(byteString);
-              isV0 = true;
-            } else {
-              byteStrings = 
readChunkResponse.getDataBuffers().getBuffersList();
-              long buffersLen = BufferUtils.getBuffersLen(byteStrings);
-              if (buffersLen != reqChunkInfo.getLen()) {
-                // Bytes read from chunk should be equal to chunk size.
-                throw new OzoneChecksumException(String.format(
-                    "Inconsistent read for chunk=%s len=%d bytesRead=%d",
-                    reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
-                    buffersLen));
-              }
-            }
-
-            if (verifyChecksum) {
-              ChecksumData checksumData = ChecksumData.getFromProtoBuf(
-                  chunkInfo.getChecksumData());
-
-              // ChecksumData stores checksum for each 'numBytesPerChecksum'
-              // number of bytes in a list. Compute the index of the first
-              // checksum to match with the read data
-
-              long relativeOffset = reqChunkInfo.getOffset() -
-                  chunkInfo.getOffset();
-              int bytesPerChecksum = checksumData.getBytesPerChecksum();
-              int startIndex = (int) (relativeOffset / bytesPerChecksum);
-              Checksum.verifyChecksum(byteStrings, checksumData, startIndex,
-                  isV0);
-            }
-          };
+  private void validateChunk(
+      ContainerCommandRequestProto request,
+      ContainerCommandResponseProto response
+  ) throws OzoneChecksumException {
+    final ChunkInfo reqChunkInfo =
+        request.getReadChunk().getChunkData();
+
+    ReadChunkResponseProto readChunkResponse = response.getReadChunk();
+    List<ByteString> byteStrings;
+    boolean isV0 = false;
+
+    if (readChunkResponse.hasData()) {
+      ByteString byteString = readChunkResponse.getData();
+      if (byteString.size() != reqChunkInfo.getLen()) {
+        // Bytes read from chunk should be equal to chunk size.
+        throw new OzoneChecksumException(String.format(
+            "Inconsistent read for chunk=%s len=%d bytesRead=%d",
+            reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
+            byteString.size()));
+      }
+      byteStrings = new ArrayList<>();
+      byteStrings.add(byteString);
+      isV0 = true;
+    } else {
+      byteStrings = readChunkResponse.getDataBuffers().getBuffersList();
+      long buffersLen = BufferUtils.getBuffersLen(byteStrings);
+      if (buffersLen != reqChunkInfo.getLen()) {
+        // Bytes read from chunk should be equal to chunk size.
+        throw new OzoneChecksumException(String.format(
+            "Inconsistent read for chunk=%s len=%d bytesRead=%d",
+            reqChunkInfo.getChunkName(), reqChunkInfo.getLen(),
+            buffersLen));
+      }
+    }
+
+    if (verifyChecksum) {
+      ChecksumData checksumData = ChecksumData.getFromProtoBuf(
+          chunkInfo.getChecksumData());
+
+      // ChecksumData stores checksum for each 'numBytesPerChecksum'
+      // number of bytes in a list. Compute the index of the first
+      // checksum to match with the read data
+
+      long relativeOffset = reqChunkInfo.getOffset() -
+          chunkInfo.getOffset();
+      int bytesPerChecksum = checksumData.getBytesPerChecksum();
+      int startIndex = (int) (relativeOffset / bytesPerChecksum);
+      Checksum.verifyChecksum(byteStrings, checksumData, startIndex,
+          isV0);
+    }
+  }
 
   /**
    * Return the offset and length of bytes that need to be read from the
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
index be72dd0701..3e7779f0d1 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
@@ -57,7 +57,7 @@ class DummyBlockInputStream extends BlockInputStream {
   }
 
   @Override
-  protected List<ChunkInfo> getChunkInfos() throws IOException {
+  protected List<ChunkInfo> getChunkInfoList() throws IOException {
     return chunks;
   }
 
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
index b39ed61d70..24a3574514 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
@@ -73,7 +73,7 @@ final class DummyBlockInputStreamWithRetry
   }
 
   @Override
-  protected List<ChunkInfo> getChunkInfos() throws IOException {
+  protected List<ChunkInfo> getChunkInfoList() throws IOException {
     if (getChunkInfoCount == 0) {
       getChunkInfoCount++;
       if (ioException != null) {
@@ -82,7 +82,7 @@ final class DummyBlockInputStreamWithRetry
       throw new StorageContainerException("Exception encountered",
           CONTAINER_NOT_FOUND);
     } else {
-      return super.getChunkInfos();
+      return super.getChunkInfoList();
     }
   }
 }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
index 78d0c05bfe..2567560787 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
@@ -45,7 +45,7 @@ public class DummyChunkInputStream extends ChunkInputStream {
       boolean verifyChecksum,
       byte[] data, Pipeline pipeline) {
     super(chunkInfo, blockId, xceiverClientFactory, () -> pipeline,
-        verifyChecksum, null);
+        verifyChecksum, () -> null);
     this.chunkData = data.clone();
   }
 
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 6c518738cb..2e95de1eca 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -97,7 +97,8 @@ public class TestBlockInputStream {
     checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
     createChunkList(5);
 
-    blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
+    Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+    blockStream = new DummyBlockInputStream(blockID, blockSize, pipeline, null,
         false, null, refreshFunction, chunks, chunkDataMap);
   }
 
@@ -413,8 +414,7 @@ public class TestBlockInputStream {
     BlockInputStream subject = new BlockInputStream(blockID, blockSize,
         pipeline, null, false, clientFactory, refreshFunction) {
       @Override
-      protected List<ChunkInfo> getChunkInfos() throws IOException {
-        acquireClient();
+      protected List<ChunkInfo> getChunkInfoListUsingClient() {
         return chunks;
       }
 
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
index 3fe861402d..f45529412f 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -22,25 +22,34 @@ import java.io.EOFException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.hadoop.hdds.client.BlockID;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 
+import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.hadoop.security.token.Token;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.fail;
+import static 
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -54,8 +63,10 @@ public class TestChunkInputStream {
   private static final int BYTES_PER_CHECKSUM = 20;
   private static final String CHUNK_NAME = "dummyChunk";
   private static final Random RANDOM = new Random();
+  private static final AtomicLong CONTAINER_ID = new AtomicLong();
 
   private DummyChunkInputStream chunkStream;
+  private BlockID blockID;
   private ChunkInfo chunkInfo;
   private byte[] chunkData;
 
@@ -65,6 +76,8 @@ public class TestChunkInputStream {
 
     chunkData = generateRandomData(CHUNK_SIZE);
 
+    blockID = new BlockID(CONTAINER_ID.incrementAndGet(), 0);
+
     chunkInfo = ChunkInfo.newBuilder()
         .setChunkName(CHUNK_NAME)
         .setOffset(0)
@@ -73,7 +86,7 @@ public class TestChunkInputStream {
             chunkData, 0, CHUNK_SIZE).getProtoBufMessage())
         .build();
 
-    chunkStream = new DummyChunkInputStream(chunkInfo, null, null, true,
+    chunkStream = new DummyChunkInputStream(chunkInfo, blockID, null, true,
         chunkData, null);
   }
 
@@ -229,29 +242,46 @@ public class TestChunkInputStream {
     // GIVEN
     Pipeline pipeline = MockPipeline.createSingleNodePipeline();
     Pipeline newPipeline = MockPipeline.createSingleNodePipeline();
-    XceiverClientFactory clientFactory = mock(XceiverClientFactory.class);
-    XceiverClientSpi client = mock(XceiverClientSpi.class);
-    when(clientFactory.acquireClientForReadData(pipeline))
-        .thenReturn(client);
+
+    Token<?> token = mock(Token.class);
+    when(token.encodeToUrlString())
+        .thenReturn("oldToken");
+    Token<?> newToken = mock(Token.class);
+    when(newToken.encodeToUrlString())
+        .thenReturn("newToken");
 
     AtomicReference<Pipeline> pipelineRef = new AtomicReference<>(pipeline);
+    AtomicReference<Token<?>> tokenRef = new AtomicReference<>(token);
 
-    try (ChunkInputStream subject = new ChunkInputStream(chunkInfo, null,
-        clientFactory, pipelineRef::get, false, null) {
-      @Override
-      protected ByteBuffer[] readChunk(ChunkInfo readChunkInfo) {
-        return ByteString.copyFrom(chunkData).asReadOnlyByteBufferList()
-            .toArray(new ByteBuffer[0]);
-      }
-    }) {
+    XceiverClientFactory clientFactory = mock(XceiverClientFactory.class);
+    XceiverClientSpi client = mock(XceiverClientSpi.class);
+    when(clientFactory.acquireClientForReadData(any()))
+        .thenReturn(client);
+    ArgumentCaptor<ContainerCommandRequestProto> requestCaptor =
+        ArgumentCaptor.forClass(ContainerCommandRequestProto.class);
+    when(client.getPipeline())
+        .thenAnswer(invocation -> pipelineRef.get());
+    when(client.sendCommand(requestCaptor.capture(), any()))
+        .thenAnswer(invocation ->
+            getReadChunkResponse(
+                requestCaptor.getValue(),
+                ChunkBuffer.wrap(ByteBuffer.wrap(chunkData)),
+                ByteStringConversion::safeWrap));
+
+    try (ChunkInputStream subject = new ChunkInputStream(chunkInfo, blockID,
+        clientFactory, pipelineRef::get, false, tokenRef::get)) {
       // WHEN
       subject.unbuffer();
       pipelineRef.set(newPipeline);
-      int b = subject.read();
+      tokenRef.set(newToken);
+      byte[] buffer = new byte[CHUNK_SIZE];
+      int read = subject.read(buffer);
 
       // THEN
-      assertNotEquals(-1, b);
+      assertEquals(CHUNK_SIZE, read);
+      assertArrayEquals(chunkData, buffer);
       verify(clientFactory).acquireClientForReadData(newPipeline);
+      verify(newToken).encodeToUrlString();
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to