This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e3206f4e7f HDDS-7930. input stream does not refresh expired block 
token. (#4378)
e3206f4e7f is described below

commit e3206f4e7ff705b5df8eeaf9b4bf6651d8f00d10
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Tue Mar 14 21:53:35 2023 -0700

    HDDS-7930. input stream does not refresh expired block token. (#4378)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 32 +++++++-------
 .../ozone/client/io/BlockInputStreamFactory.java   |  4 +-
 .../client/io/BlockInputStreamFactoryImpl.java     |  2 +-
 .../hadoop/ozone/client/io/ECBlockInputStream.java | 20 +++++----
 .../ozone/client/io/ECBlockInputStreamFactory.java |  5 +--
 .../client/io/ECBlockInputStreamFactoryImpl.java   |  3 +-
 .../ozone/client/io/ECBlockInputStreamProxy.java   |  5 ++-
 .../io/ECBlockReconstructedStripeInputStream.java  |  6 +--
 .../hdds/scm/storage/DummyBlockInputStream.java    |  2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    | 23 +++++-----
 .../hdds/scm/storage/TestBlockInputStream.java     | 50 ++++++++++++++--------
 .../hadoop/ozone/client/io/ECStreamTestUtil.java   |  2 +-
 .../ozone/client/io/TestECBlockInputStream.java    | 26 +++++++----
 .../client/io/TestECBlockInputStreamProxy.java     |  3 +-
 .../hadoop/ozone/client/io/KeyInputStream.java     |  9 ++--
 15 files changed, 110 insertions(+), 82 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 5c44f31c0d..55dc0557bf 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -64,7 +64,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
   private final BlockID blockID;
   private final long length;
   private Pipeline pipeline;
-  private final Token<OzoneBlockTokenIdentifier> token;
+  private Token<OzoneBlockTokenIdentifier> token;
   private final boolean verifyChecksum;
   private XceiverClientFactory xceiverClientFactory;
   private XceiverClientSpi xceiverClient;
@@ -103,19 +103,19 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
   // can be reset if a new position is seeked.
   private int chunkIndexOfPrevPosition;
 
-  private final Function<BlockID, Pipeline> refreshPipelineFunction;
+  private final Function<BlockID, BlockLocationInfo> refreshFunction;
 
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory,
-      Function<BlockID, Pipeline> refreshPipelineFunction) {
+      Function<BlockID, BlockLocationInfo> refreshFunction) {
     this.blockID = blockId;
     this.length = blockLen;
     this.pipeline = pipeline;
     this.token = token;
     this.verifyChecksum = verifyChecksum;
     this.xceiverClientFactory = xceiverClientFactory;
-    this.refreshPipelineFunction = refreshPipelineFunction;
+    this.refreshFunction = refreshFunction;
   }
 
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
@@ -150,12 +150,12 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
       } catch (SCMSecurityException ex) {
         throw ex;
       } catch (StorageContainerException ex) {
-        refreshPipeline(ex);
+        refreshBlockInfo(ex);
         catchEx = ex;
       } catch (IOException ex) {
         LOG.debug("Retry to get chunk info fail", ex);
         if (isConnectivityIssue(ex)) {
-          refreshPipeline(ex);
+          refreshBlockInfo(ex);
         }
         catchEx = ex;
       }
@@ -199,17 +199,19 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
     return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
   }
 
-  private void refreshPipeline(IOException cause) throws IOException {
+  private void refreshBlockInfo(IOException cause) throws IOException {
     LOG.info("Unable to read information for block {} from pipeline {}: {}",
         blockID, pipeline.getId(), cause.getMessage());
-    if (refreshPipelineFunction != null) {
-      LOG.debug("Re-fetching pipeline for block {}", blockID);
-      Pipeline newPipeline = refreshPipelineFunction.apply(blockID);
-      if (newPipeline == null) {
-        LOG.debug("No new pipeline for block {}", blockID);
+    if (refreshFunction != null) {
+      LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
+      BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
+      if (blockLocationInfo == null) {
+        LOG.debug("No new block location info for block {}", blockID);
       } else {
-        LOG.debug("New pipeline for block {}: {}", blockID, newPipeline);
-        this.pipeline = newPipeline;
+        LOG.debug("New block location info for block {}: {}",
+            blockID, blockLocationInfo);
+        this.pipeline = blockLocationInfo.getPipeline();
+        this.token = blockLocationInfo.getToken();
       }
     } else {
       throw cause;
@@ -526,7 +528,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
       }
     }
 
-    refreshPipeline(cause);
+    refreshBlockInfo(cause);
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
index 6703216016..bd100214ae 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
@@ -43,13 +43,13 @@ public interface BlockInputStreamFactory {
    * @param token The block Access Token
    * @param verifyChecksum Whether to verify checksums or not.
    * @param xceiverFactory Factory to create the xceiver in the client
-   * @param refreshFunction Function to refresh the pipeline if needed
+   * @param refreshFunction Function to refresh the block location if needed
    * @return BlockExtendedInputStream of the correct type.
    */
   BlockExtendedInputStream create(ReplicationConfig repConfig,
       BlockLocationInfo blockInfo, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
        XceiverClientFactory xceiverFactory,
-       Function<BlockID, Pipeline> refreshFunction);
+       Function<BlockID, BlockLocationInfo> refreshFunction);
 
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index ba05ec2ed8..40063f9ce4 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -78,7 +78,7 @@ public class BlockInputStreamFactoryImpl implements 
BlockInputStreamFactory {
       BlockLocationInfo blockInfo, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverFactory,
-      Function<BlockID, Pipeline> refreshFunction) {
+      Function<BlockID, BlockLocationInfo> refreshFunction) {
     if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
       return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
           blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 012c14cece..dc354198ca 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -62,7 +62,7 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
   private final BlockInputStreamFactory streamFactory;
   private final boolean verifyChecksum;
   private final XceiverClientFactory xceiverClientFactory;
-  private final Function<BlockID, Pipeline> refreshFunction;
+  private final Function<BlockID, BlockLocationInfo> refreshFunction;
   private final BlockLocationInfo blockInfo;
   private final DatanodeDetails[] dataLocations;
   private final BlockExtendedInputStream[] blockStreams;
@@ -120,8 +120,9 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
 
   public ECBlockInputStream(ECReplicationConfig repConfig,
       BlockLocationInfo blockInfo, boolean verifyChecksum,
-      XceiverClientFactory xceiverClientFactory, Function<BlockID,
-      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, BlockLocationInfo> refreshFunction,
+      BlockInputStreamFactory streamFactory) {
     this.repConfig = repConfig;
     this.ecChunkSize = repConfig.getEcChunkSize();
     this.verifyChecksum = verifyChecksum;
@@ -215,13 +216,14 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
    * @param refreshFunc
    * @return
    */
-  protected Function<BlockID, Pipeline> ecPipelineRefreshFunction(
-      int replicaIndex, Function<BlockID, Pipeline> refreshFunc) {
+  protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
+      int replicaIndex, Function<BlockID, BlockLocationInfo> refreshFunc) {
     return (blockID) -> {
-      Pipeline ecPipeline = refreshFunc.apply(blockID);
-      if (ecPipeline == null) {
+      BlockLocationInfo blockLocationInfo = refreshFunc.apply(blockID);
+      if (blockLocationInfo == null) {
         return null;
       }
+      Pipeline ecPipeline = blockLocationInfo.getPipeline();
       DatanodeDetails curIndexNode = ecPipeline.getNodes()
           .stream().filter(dn ->
               ecPipeline.getReplicaIndex(dn) == replicaIndex)
@@ -229,13 +231,15 @@ public class ECBlockInputStream extends 
BlockExtendedInputStream {
       if (curIndexNode == null) {
         return null;
       }
-      return Pipeline.newBuilder().setReplicationConfig(
+      Pipeline pipeline = Pipeline.newBuilder().setReplicationConfig(
               StandaloneReplicationConfig.getInstance(
                   HddsProtos.ReplicationFactor.ONE))
           .setNodes(Collections.singletonList(curIndexNode))
           .setId(PipelineID.randomId())
           .setState(Pipeline.PipelineState.CLOSED)
           .build();
+      blockLocationInfo.setPipeline(pipeline);
+      return blockLocationInfo;
     };
   }
 
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
index c9d2b76a78..0e2ef22c1e 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 
@@ -47,12 +46,12 @@ public interface ECBlockInputStreamFactory {
    * @param blockInfo The blockInfo representing the block.
    * @param verifyChecksum Whether to verify checksums or not.
    * @param xceiverFactory Factory to create the xceiver in the client
-   * @param refreshFunction Function to refresh the pipeline if needed
+   * @param refreshFunction Function to refresh the block location if needed
    * @return BlockExtendedInputStream of the correct type.
    */
   BlockExtendedInputStream create(boolean missingLocations,
       List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
       BlockLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverFactory,
-      Function<BlockID, Pipeline> refreshFunction);
+      Function<BlockID, BlockLocationInfo> refreshFunction);
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index efc3b31c84..36b6539ea8 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.io.ByteBufferPool;
@@ -77,7 +76,7 @@ public final class ECBlockInputStreamFactoryImpl implements
       List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
       BlockLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverFactory,
-      Function<BlockID, Pipeline> refreshFunction) {
+      Function<BlockID, BlockLocationInfo> refreshFunction) {
     if (missingLocations) {
       // We create the reconstruction reader
       ECBlockReconstructedStripeInputStream sis =
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
index 47758eae02..973561616f 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -51,7 +51,7 @@ public class ECBlockInputStreamProxy extends 
BlockExtendedInputStream {
   private final ECReplicationConfig repConfig;
   private final boolean verifyChecksum;
   private final XceiverClientFactory xceiverClientFactory;
-  private final Function<BlockID, Pipeline> refreshFunction;
+  private final Function<BlockID, BlockLocationInfo> refreshFunction;
   private final BlockLocationInfo blockInfo;
   private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
 
@@ -99,7 +99,8 @@ public class ECBlockInputStreamProxy extends 
BlockExtendedInputStream {
   public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
       BlockLocationInfo blockInfo, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory, Function<BlockID,
-      Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
+      BlockLocationInfo> refreshFunction,
+      ECBlockInputStreamFactory streamFactory) {
     this.repConfig = repConfig;
     this.verifyChecksum = verifyChecksum;
     this.blockInfo = blockInfo;
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 0d11b88b36..9658fb784d 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
@@ -152,8 +151,9 @@ public class ECBlockReconstructedStripeInputStream extends 
ECBlockInputStream {
   @SuppressWarnings("checkstyle:ParameterNumber")
   public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
       BlockLocationInfo blockInfo, boolean verifyChecksum,
-      XceiverClientFactory xceiverClientFactory, Function<BlockID,
-      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, BlockLocationInfo> refreshFunction,
+      BlockInputStreamFactory streamFactory,
       ByteBufferPool byteBufferPool,
       ExecutorService ecReconstructExecutor) {
     super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
index 1c7968b134..be72dd0701 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
@@ -46,7 +46,7 @@ class DummyBlockInputStream extends BlockInputStream {
       Token<OzoneBlockTokenIdentifier> token,
       boolean verifyChecksum,
       XceiverClientFactory xceiverClientManager,
-      Function<BlockID, Pipeline> refreshFunction,
+      Function<BlockID, BlockLocationInfo> refreshFunction,
       List<ChunkInfo> chunkList,
       Map<String, byte[]> chunks) {
     super(blockId, blockLen, pipeline, token, verifyChecksum,
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
index 5a029763d6..b39ed61d70 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
@@ -18,23 +18,22 @@
 package org.apache.hadoop.hdds.scm.storage;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * A dummy BlockInputStream with pipeline refresh function to mock read
@@ -60,13 +59,15 @@ final class DummyBlockInputStreamWithRetry
     super(blockId, blockLen, pipeline, token, verifyChecksum,
         xceiverClientManager, blockID -> {
           isRerfreshed.set(true);
-          return Pipeline.newBuilder()
-              .setState(Pipeline.PipelineState.OPEN)
-              .setId(PipelineID.randomId())
-              .setReplicationConfig(StandaloneReplicationConfig.getInstance(
-                  ReplicationFactor.ONE))
-              .setNodes(Collections.emptyList())
-              .build();
+          try {
+            BlockLocationInfo blockLocationInfo = 
mock(BlockLocationInfo.class);
+            Pipeline mockPipeline = MockPipeline.createPipeline(1);
+            when(blockLocationInfo.getPipeline()).thenReturn(mockPipeline);
+            return blockLocationInfo;
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+
         }, chunkList, chunkMap);
     this.ioException  = ioException;
   }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 84813f3c6d..04805576fd 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -84,18 +84,18 @@ public class TestBlockInputStream {
   private List<ChunkInfo> chunks;
   private Map<String, byte[]> chunkDataMap;
 
-  private Function<BlockID, Pipeline> refreshPipeline;
+  private Function<BlockID, BlockLocationInfo> refreshFunction;
 
   @BeforeEach
   @SuppressWarnings("unchecked")
   public void setup() throws Exception {
-    refreshPipeline = Mockito.mock(Function.class);
+    refreshFunction = Mockito.mock(Function.class);
     BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
     checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
     createChunkList(5);
 
     blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
-        false, null, refreshPipeline, chunks, chunkDataMap);
+        false, null, refreshFunction, chunks, chunkDataMap);
   }
 
   /**
@@ -290,15 +290,26 @@ public class TestBlockInputStream {
   void refreshesPipelineOnReadFailure(IOException ex) throws Exception {
     // GIVEN
     Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+    BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
+    when(blockLocationInfo.getPipeline()).thenReturn(pipeline);
     Pipeline newPipeline = MockPipeline.createSingleNodePipeline();
+    BlockLocationInfo newBlockLocationInfo = mock(BlockLocationInfo.class);
 
-    testRefreshesPipelineOnReadFailure(ex, pipeline, id -> newPipeline);
-    testRefreshesPipelineOnReadFailure(ex, pipeline, id -> pipeline);
-    testRefreshesPipelineOnReadFailure(ex, pipeline, id -> null);
+    testRefreshesPipelineOnReadFailure(ex, blockLocationInfo,
+        id -> newBlockLocationInfo);
+
+    when(newBlockLocationInfo.getPipeline()).thenReturn(newPipeline);
+    testRefreshesPipelineOnReadFailure(ex, blockLocationInfo,
+        id -> blockLocationInfo);
+
+    when(newBlockLocationInfo.getPipeline()).thenReturn(null);
+    testRefreshesPipelineOnReadFailure(ex, blockLocationInfo,
+        id -> newBlockLocationInfo);
   }
 
   private void testRefreshesPipelineOnReadFailure(IOException ex,
-      Pipeline pipeline, Function<BlockID, Pipeline> refreshFunction)
+      BlockLocationInfo blockLocationInfo,
+      Function<BlockID, BlockLocationInfo> refreshPipelineFunction)
       throws Exception {
 
     BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
@@ -306,10 +317,11 @@ public class TestBlockInputStream {
     final int len = 200;
     final ChunkInputStream stream = throwingChunkInputStream(ex, len, true);
 
-    when(refreshPipeline.apply(any()))
-        .thenAnswer(inv -> refreshFunction.apply(blockID));
+    when(this.refreshFunction.apply(any()))
+        .thenAnswer(inv -> refreshPipelineFunction.apply(blockID));
 
-    try (BlockInputStream subject = createSubject(blockID, pipeline, stream)) {
+    try (BlockInputStream subject = createSubject(blockID,
+        blockLocationInfo.getPipeline(), stream)) {
       subject.initialize();
 
       // WHEN
@@ -318,9 +330,9 @@ public class TestBlockInputStream {
 
       // THEN
       Assert.assertEquals(len, bytesRead);
-      verify(refreshPipeline).apply(blockID);
+      verify(this.refreshFunction).apply(blockID);
     } finally {
-      reset(refreshPipeline);
+      reset(this.refreshFunction);
     }
   }
 
@@ -349,7 +361,7 @@ public class TestBlockInputStream {
   private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline,
       ChunkInputStream stream) {
     return new DummyBlockInputStream(blockID, blockSize, pipeline, null, false,
-        null, refreshPipeline, chunks, null) {
+        null, refreshFunction, chunks, null) {
       @Override
       protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
         return stream;
@@ -376,7 +388,7 @@ public class TestBlockInputStream {
           () -> subject.read(new byte[len], 0, len));
 
       // THEN
-      verify(refreshPipeline, never()).apply(blockID);
+      verify(refreshFunction, never()).apply(blockID);
     }
   }
 
@@ -396,17 +408,19 @@ public class TestBlockInputStream {
     Pipeline newPipeline = MockPipeline.createSingleNodePipeline();
     XceiverClientFactory clientFactory = mock(XceiverClientFactory.class);
     XceiverClientSpi client = mock(XceiverClientSpi.class);
+    BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
     when(clientFactory.acquireClientForReadData(pipeline))
         .thenReturn(client);
 
     final int len = 200;
     final ChunkInputStream stream = throwingChunkInputStream(ex, len, true);
 
-    when(refreshPipeline.apply(blockID))
-        .thenReturn(newPipeline);
+    when(refreshFunction.apply(blockID))
+        .thenReturn(blockLocationInfo);
+    when(blockLocationInfo.getPipeline()).thenReturn(newPipeline);
 
     BlockInputStream subject = new BlockInputStream(blockID, blockSize,
-        pipeline, null, false, clientFactory, refreshPipeline) {
+        pipeline, null, false, clientFactory, refreshFunction) {
       @Override
       protected List<ChunkInfo> getChunkInfos() throws IOException {
         acquireClient();
@@ -429,7 +443,7 @@ public class TestBlockInputStream {
 
       // THEN
       Assert.assertEquals(len, bytesRead);
-      verify(refreshPipeline).apply(blockID);
+      verify(refreshFunction).apply(blockID);
       verify(clientFactory).acquireClientForReadData(pipeline);
       verify(clientFactory).releaseClientForReadData(client, false);
     } finally {
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
index f4d40c811c..0fe5886f1b 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
@@ -258,7 +258,7 @@ public final class ECStreamTestUtil {
         BlockLocationInfo blockInfo, Pipeline pipeline,
         Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
         XceiverClientFactory xceiverFactory,
-        Function<BlockID, Pipeline> refreshFunction) {
+        Function<BlockID, BlockLocationInfo> refreshFunction) {
 
       int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
       TestBlockInputStream stream = new TestBlockInputStream(
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index 07c6b39234..caa071b1b9 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -462,18 +462,26 @@ public class TestECBlockInputStream {
     }
 
     // Create a refreshFunction that returns a hard-coded EC pipeline.
-    Function<BlockID, Pipeline> refreshFunction = blkID -> 
Pipeline.newBuilder()
-        .setReplicationConfig(repConfig)
-        .setNodes(new ArrayList<>(dnMap.keySet()))
-        .setReplicaIndexes(dnMap)
-        .setState(Pipeline.PipelineState.CLOSED)
-        .setId(PipelineID.randomId())
-        .build();
+    Function<BlockID, BlockLocationInfo> refreshFunction = blkID -> {
+      Pipeline pipeline = Pipeline.newBuilder()
+          .setReplicationConfig(repConfig)
+          .setNodes(new ArrayList<>(dnMap.keySet()))
+          .setReplicaIndexes(dnMap)
+          .setState(Pipeline.PipelineState.CLOSED)
+          .setId(PipelineID.randomId())
+          .build();
+      BlockLocationInfo blockLocation = new BlockLocationInfo.Builder()
+          .setPipeline(pipeline)
+          .build();
+      return blockLocation;
+    };
 
     try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
         keyInfo, true, null, null, streamFactory)) {
       Pipeline pipeline =
-          ecb.ecPipelineRefreshFunction(3, refreshFunction).apply(blockID);
+          ecb.ecPipelineRefreshFunction(3, refreshFunction)
+              .apply(blockID)
+              .getPipeline();
       // Check the pipeline is built with the correct Datanode
       // with right replicaIndex.
       Assertions.assertEquals(HddsProtos.ReplicationType.STAND_ALONE,
@@ -503,7 +511,7 @@ public class TestECBlockInputStream {
         ReplicationConfig repConfig, BlockLocationInfo blockInfo,
         Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
         boolean verifyChecksum, XceiverClientFactory xceiverFactory,
-        Function<BlockID, Pipeline> refreshFunction) {
+        Function<BlockID, BlockLocationInfo> refreshFunction) {
       TestBlockInputStream stream = new TestBlockInputStream(
           blockInfo.getBlockID(), blockInfo.getLength(),
           (byte)blockStreams.size());
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
index 89ac7a831e..929fa13042 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.junit.jupiter.api.Assertions;
@@ -378,7 +377,7 @@ public class TestECBlockInputStreamProxy {
         List<DatanodeDetails> failedDatanodes,
         ReplicationConfig repConfig, BlockLocationInfo blockInfo,
         boolean verifyChecksum, XceiverClientFactory xceiverFactory,
-        Function<BlockID, Pipeline> refreshFunction) {
+        Function<BlockID, BlockLocationInfo> refreshFunction) {
       this.failedLocations = failedDatanodes;
       ByteBuffer wrappedBuffer =
           ByteBuffer.wrap(data.array(), 0, data.capacity());
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index ce068f1b36..91d4b94404 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -26,7 +26,6 @@ import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
@@ -85,14 +84,16 @@ public class KeyInputStream extends MultipartInputStream {
               xceiverClientFactory,
               keyBlockID -> {
                 OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo);
-                return getPipeline(newKeyInfo, omKeyLocationInfo.getBlockID());
+                return getBlockLocationInfo(newKeyInfo,
+                    omKeyLocationInfo.getBlockID());
               });
       partStreams.add(stream);
     }
     return partStreams;
   }
 
-  private static Pipeline getPipeline(OmKeyInfo newKeyInfo, BlockID blockID) {
+  private static BlockLocationInfo getBlockLocationInfo(OmKeyInfo newKeyInfo,
+      BlockID blockID) {
     List<OmKeyLocationInfo> collect =
         newKeyInfo.getLatestVersionLocations()
             .getLocationList()
@@ -100,7 +101,7 @@ public class KeyInputStream extends MultipartInputStream {
             .filter(l -> l.getBlockID().equals(blockID))
             .collect(Collectors.toList());
     if (CollectionUtils.isNotEmpty(collect)) {
-      return collect.get(0).getPipeline();
+      return collect.get(0);
     } else {
       return null;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to