This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new e3206f4e7f HDDS-7930. input stream does not refresh expired block
token. (#4378)
e3206f4e7f is described below
commit e3206f4e7ff705b5df8eeaf9b4bf6651d8f00d10
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Tue Mar 14 21:53:35 2023 -0700
HDDS-7930. input stream does not refresh expired block token. (#4378)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 32 +++++++-------
.../ozone/client/io/BlockInputStreamFactory.java | 4 +-
.../client/io/BlockInputStreamFactoryImpl.java | 2 +-
.../hadoop/ozone/client/io/ECBlockInputStream.java | 20 +++++----
.../ozone/client/io/ECBlockInputStreamFactory.java | 5 +--
.../client/io/ECBlockInputStreamFactoryImpl.java | 3 +-
.../ozone/client/io/ECBlockInputStreamProxy.java | 5 ++-
.../io/ECBlockReconstructedStripeInputStream.java | 6 +--
.../hdds/scm/storage/DummyBlockInputStream.java | 2 +-
.../storage/DummyBlockInputStreamWithRetry.java | 23 +++++-----
.../hdds/scm/storage/TestBlockInputStream.java | 50 ++++++++++++++--------
.../hadoop/ozone/client/io/ECStreamTestUtil.java | 2 +-
.../ozone/client/io/TestECBlockInputStream.java | 26 +++++++----
.../client/io/TestECBlockInputStreamProxy.java | 3 +-
.../hadoop/ozone/client/io/KeyInputStream.java | 9 ++--
15 files changed, 110 insertions(+), 82 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 5c44f31c0d..55dc0557bf 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -64,7 +64,7 @@ public class BlockInputStream extends
BlockExtendedInputStream {
private final BlockID blockID;
private final long length;
private Pipeline pipeline;
- private final Token<OzoneBlockTokenIdentifier> token;
+ private Token<OzoneBlockTokenIdentifier> token;
private final boolean verifyChecksum;
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
@@ -103,19 +103,19 @@ public class BlockInputStream extends
BlockExtendedInputStream {
// can be reset if a new position is seeked.
private int chunkIndexOfPrevPosition;
- private final Function<BlockID, Pipeline> refreshPipelineFunction;
+ private final Function<BlockID, BlockLocationInfo> refreshFunction;
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory,
- Function<BlockID, Pipeline> refreshPipelineFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction) {
this.blockID = blockId;
this.length = blockLen;
this.pipeline = pipeline;
this.token = token;
this.verifyChecksum = verifyChecksum;
this.xceiverClientFactory = xceiverClientFactory;
- this.refreshPipelineFunction = refreshPipelineFunction;
+ this.refreshFunction = refreshFunction;
}
public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
@@ -150,12 +150,12 @@ public class BlockInputStream extends
BlockExtendedInputStream {
} catch (SCMSecurityException ex) {
throw ex;
} catch (StorageContainerException ex) {
- refreshPipeline(ex);
+ refreshBlockInfo(ex);
catchEx = ex;
} catch (IOException ex) {
LOG.debug("Retry to get chunk info fail", ex);
if (isConnectivityIssue(ex)) {
- refreshPipeline(ex);
+ refreshBlockInfo(ex);
}
catchEx = ex;
}
@@ -199,17 +199,19 @@ public class BlockInputStream extends
BlockExtendedInputStream {
return Status.fromThrowable(ex).getCode() == Status.UNAVAILABLE.getCode();
}
- private void refreshPipeline(IOException cause) throws IOException {
+ private void refreshBlockInfo(IOException cause) throws IOException {
LOG.info("Unable to read information for block {} from pipeline {}: {}",
blockID, pipeline.getId(), cause.getMessage());
- if (refreshPipelineFunction != null) {
- LOG.debug("Re-fetching pipeline for block {}", blockID);
- Pipeline newPipeline = refreshPipelineFunction.apply(blockID);
- if (newPipeline == null) {
- LOG.debug("No new pipeline for block {}", blockID);
+ if (refreshFunction != null) {
+ LOG.debug("Re-fetching pipeline and block token for block {}", blockID);
+ BlockLocationInfo blockLocationInfo = refreshFunction.apply(blockID);
+ if (blockLocationInfo == null) {
+ LOG.debug("No new block location info for block {}", blockID);
} else {
- LOG.debug("New pipeline for block {}: {}", blockID, newPipeline);
- this.pipeline = newPipeline;
+ LOG.debug("New block location info for block {}: {}",
+ blockID, blockLocationInfo);
+ this.pipeline = blockLocationInfo.getPipeline();
+ this.token = blockLocationInfo.getToken();
}
} else {
throw cause;
@@ -526,7 +528,7 @@ public class BlockInputStream extends
BlockExtendedInputStream {
}
}
- refreshPipeline(cause);
+ refreshBlockInfo(cause);
}
@VisibleForTesting
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
index 6703216016..bd100214ae 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactory.java
@@ -43,13 +43,13 @@ public interface BlockInputStreamFactory {
* @param token The block Access Token
* @param verifyChecksum Whether to verify checksums or not.
* @param xceiverFactory Factory to create the xceiver in the client
- * @param refreshFunction Function to refresh the pipeline if needed
+ * @param refreshFunction Function to refresh the block location if needed
* @return BlockExtendedInputStream of the correct type.
*/
BlockExtendedInputStream create(ReplicationConfig repConfig,
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction);
+ Function<BlockID, BlockLocationInfo> refreshFunction);
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
index ba05ec2ed8..40063f9ce4 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockInputStreamFactoryImpl.java
@@ -78,7 +78,7 @@ public class BlockInputStreamFactoryImpl implements
BlockInputStreamFactory {
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction) {
if (repConfig.getReplicationType().equals(HddsProtos.ReplicationType.EC)) {
return new ECBlockInputStreamProxy((ECReplicationConfig)repConfig,
blockInfo, verifyChecksum, xceiverFactory, refreshFunction,
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 012c14cece..dc354198ca 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@ -62,7 +62,7 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
private final BlockInputStreamFactory streamFactory;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
- private final Function<BlockID, Pipeline> refreshFunction;
+ private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
private final DatanodeDetails[] dataLocations;
private final BlockExtendedInputStream[] blockStreams;
@@ -120,8 +120,9 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
public ECBlockInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
- XceiverClientFactory xceiverClientFactory, Function<BlockID,
- Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+ XceiverClientFactory xceiverClientFactory,
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ BlockInputStreamFactory streamFactory) {
this.repConfig = repConfig;
this.ecChunkSize = repConfig.getEcChunkSize();
this.verifyChecksum = verifyChecksum;
@@ -215,13 +216,14 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
* @param refreshFunc
* @return
*/
- protected Function<BlockID, Pipeline> ecPipelineRefreshFunction(
- int replicaIndex, Function<BlockID, Pipeline> refreshFunc) {
+ protected Function<BlockID, BlockLocationInfo> ecPipelineRefreshFunction(
+ int replicaIndex, Function<BlockID, BlockLocationInfo> refreshFunc) {
return (blockID) -> {
- Pipeline ecPipeline = refreshFunc.apply(blockID);
- if (ecPipeline == null) {
+ BlockLocationInfo blockLocationInfo = refreshFunc.apply(blockID);
+ if (blockLocationInfo == null) {
return null;
}
+ Pipeline ecPipeline = blockLocationInfo.getPipeline();
DatanodeDetails curIndexNode = ecPipeline.getNodes()
.stream().filter(dn ->
ecPipeline.getReplicaIndex(dn) == replicaIndex)
@@ -229,13 +231,15 @@ public class ECBlockInputStream extends
BlockExtendedInputStream {
if (curIndexNode == null) {
return null;
}
- return Pipeline.newBuilder().setReplicationConfig(
+ Pipeline pipeline = Pipeline.newBuilder().setReplicationConfig(
StandaloneReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.ONE))
.setNodes(Collections.singletonList(curIndexNode))
.setId(PipelineID.randomId())
.setState(Pipeline.PipelineState.CLOSED)
.build();
+ blockLocationInfo.setPipeline(pipeline);
+ return blockLocationInfo;
};
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
index c9d2b76a78..0e2ef22c1e 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactory.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
@@ -47,12 +46,12 @@ public interface ECBlockInputStreamFactory {
* @param blockInfo The blockInfo representing the block.
* @param verifyChecksum Whether to verify checksums or not.
* @param xceiverFactory Factory to create the xceiver in the client
- * @param refreshFunction Function to refresh the pipeline if needed
+ * @param refreshFunction Function to refresh the block location if needed
* @return BlockExtendedInputStream of the correct type.
*/
BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction);
+ Function<BlockID, BlockLocationInfo> refreshFunction);
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
index efc3b31c84..36b6539ea8 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamFactoryImpl.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
@@ -77,7 +76,7 @@ public final class ECBlockInputStreamFactoryImpl implements
List<DatanodeDetails> failedLocations, ReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction) {
if (missingLocations) {
// We create the reconstruction reader
ECBlockReconstructedStripeInputStream sis =
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
index 47758eae02..973561616f 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStreamProxy.java
@@ -51,7 +51,7 @@ public class ECBlockInputStreamProxy extends
BlockExtendedInputStream {
private final ECReplicationConfig repConfig;
private final boolean verifyChecksum;
private final XceiverClientFactory xceiverClientFactory;
- private final Function<BlockID, Pipeline> refreshFunction;
+ private final Function<BlockID, BlockLocationInfo> refreshFunction;
private final BlockLocationInfo blockInfo;
private final ECBlockInputStreamFactory ecBlockInputStreamFactory;
@@ -99,7 +99,8 @@ public class ECBlockInputStreamProxy extends
BlockExtendedInputStream {
public ECBlockInputStreamProxy(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
XceiverClientFactory xceiverClientFactory, Function<BlockID,
- Pipeline> refreshFunction, ECBlockInputStreamFactory streamFactory) {
+ BlockLocationInfo> refreshFunction,
+ ECBlockInputStreamFactory streamFactory) {
this.repConfig = repConfig;
this.verifyChecksum = verifyChecksum;
this.blockInfo = blockInfo;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
index 0d11b88b36..9658fb784d 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockReconstructedStripeInputStream.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
@@ -152,8 +151,9 @@ public class ECBlockReconstructedStripeInputStream extends
ECBlockInputStream {
@SuppressWarnings("checkstyle:ParameterNumber")
public ECBlockReconstructedStripeInputStream(ECReplicationConfig repConfig,
BlockLocationInfo blockInfo, boolean verifyChecksum,
- XceiverClientFactory xceiverClientFactory, Function<BlockID,
- Pipeline> refreshFunction, BlockInputStreamFactory streamFactory,
+ XceiverClientFactory xceiverClientFactory,
+ Function<BlockID, BlockLocationInfo> refreshFunction,
+ BlockInputStreamFactory streamFactory,
ByteBufferPool byteBufferPool,
ExecutorService ecReconstructExecutor) {
super(repConfig, blockInfo, verifyChecksum, xceiverClientFactory,
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
index 1c7968b134..be72dd0701 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStream.java
@@ -46,7 +46,7 @@ class DummyBlockInputStream extends BlockInputStream {
Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum,
XceiverClientFactory xceiverClientManager,
- Function<BlockID, Pipeline> refreshFunction,
+ Function<BlockID, BlockLocationInfo> refreshFunction,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunks) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
index 5a029763d6..b39ed61d70 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
@@ -18,23 +18,22 @@
package org.apache.hadoop.hdds.scm.storage;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* A dummy BlockInputStream with pipeline refresh function to mock read
@@ -60,13 +59,15 @@ final class DummyBlockInputStreamWithRetry
super(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientManager, blockID -> {
isRerfreshed.set(true);
- return Pipeline.newBuilder()
- .setState(Pipeline.PipelineState.OPEN)
- .setId(PipelineID.randomId())
- .setReplicationConfig(StandaloneReplicationConfig.getInstance(
- ReplicationFactor.ONE))
- .setNodes(Collections.emptyList())
- .build();
+ try {
+ BlockLocationInfo blockLocationInfo =
mock(BlockLocationInfo.class);
+ Pipeline mockPipeline = MockPipeline.createPipeline(1);
+ when(blockLocationInfo.getPipeline()).thenReturn(mockPipeline);
+ return blockLocationInfo;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
}, chunkList, chunkMap);
this.ioException = ioException;
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index 84813f3c6d..04805576fd 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -84,18 +84,18 @@ public class TestBlockInputStream {
private List<ChunkInfo> chunks;
private Map<String, byte[]> chunkDataMap;
- private Function<BlockID, Pipeline> refreshPipeline;
+ private Function<BlockID, BlockLocationInfo> refreshFunction;
@BeforeEach
@SuppressWarnings("unchecked")
public void setup() throws Exception {
- refreshPipeline = Mockito.mock(Function.class);
+ refreshFunction = Mockito.mock(Function.class);
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
checksum = new Checksum(ChecksumType.NONE, CHUNK_SIZE);
createChunkList(5);
blockStream = new DummyBlockInputStream(blockID, blockSize, null, null,
- false, null, refreshPipeline, chunks, chunkDataMap);
+ false, null, refreshFunction, chunks, chunkDataMap);
}
/**
@@ -290,15 +290,26 @@ public class TestBlockInputStream {
void refreshesPipelineOnReadFailure(IOException ex) throws Exception {
// GIVEN
Pipeline pipeline = MockPipeline.createSingleNodePipeline();
+ BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
+ when(blockLocationInfo.getPipeline()).thenReturn(pipeline);
Pipeline newPipeline = MockPipeline.createSingleNodePipeline();
+ BlockLocationInfo newBlockLocationInfo = mock(BlockLocationInfo.class);
- testRefreshesPipelineOnReadFailure(ex, pipeline, id -> newPipeline);
- testRefreshesPipelineOnReadFailure(ex, pipeline, id -> pipeline);
- testRefreshesPipelineOnReadFailure(ex, pipeline, id -> null);
+ testRefreshesPipelineOnReadFailure(ex, blockLocationInfo,
+ id -> newBlockLocationInfo);
+
+ when(newBlockLocationInfo.getPipeline()).thenReturn(newPipeline);
+ testRefreshesPipelineOnReadFailure(ex, blockLocationInfo,
+ id -> blockLocationInfo);
+
+ when(newBlockLocationInfo.getPipeline()).thenReturn(null);
+ testRefreshesPipelineOnReadFailure(ex, blockLocationInfo,
+ id -> newBlockLocationInfo);
}
private void testRefreshesPipelineOnReadFailure(IOException ex,
- Pipeline pipeline, Function<BlockID, Pipeline> refreshFunction)
+ BlockLocationInfo blockLocationInfo,
+ Function<BlockID, BlockLocationInfo> refreshPipelineFunction)
throws Exception {
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
@@ -306,10 +317,11 @@ public class TestBlockInputStream {
final int len = 200;
final ChunkInputStream stream = throwingChunkInputStream(ex, len, true);
- when(refreshPipeline.apply(any()))
- .thenAnswer(inv -> refreshFunction.apply(blockID));
+ when(this.refreshFunction.apply(any()))
+ .thenAnswer(inv -> refreshPipelineFunction.apply(blockID));
- try (BlockInputStream subject = createSubject(blockID, pipeline, stream)) {
+ try (BlockInputStream subject = createSubject(blockID,
+ blockLocationInfo.getPipeline(), stream)) {
subject.initialize();
// WHEN
@@ -318,9 +330,9 @@ public class TestBlockInputStream {
// THEN
Assert.assertEquals(len, bytesRead);
- verify(refreshPipeline).apply(blockID);
+ verify(this.refreshFunction).apply(blockID);
} finally {
- reset(refreshPipeline);
+ reset(this.refreshFunction);
}
}
@@ -349,7 +361,7 @@ public class TestBlockInputStream {
private BlockInputStream createSubject(BlockID blockID, Pipeline pipeline,
ChunkInputStream stream) {
return new DummyBlockInputStream(blockID, blockSize, pipeline, null, false,
- null, refreshPipeline, chunks, null) {
+ null, refreshFunction, chunks, null) {
@Override
protected ChunkInputStream createChunkInputStream(ChunkInfo chunkInfo) {
return stream;
@@ -376,7 +388,7 @@ public class TestBlockInputStream {
() -> subject.read(new byte[len], 0, len));
// THEN
- verify(refreshPipeline, never()).apply(blockID);
+ verify(refreshFunction, never()).apply(blockID);
}
}
@@ -396,17 +408,19 @@ public class TestBlockInputStream {
Pipeline newPipeline = MockPipeline.createSingleNodePipeline();
XceiverClientFactory clientFactory = mock(XceiverClientFactory.class);
XceiverClientSpi client = mock(XceiverClientSpi.class);
+ BlockLocationInfo blockLocationInfo = mock(BlockLocationInfo.class);
when(clientFactory.acquireClientForReadData(pipeline))
.thenReturn(client);
final int len = 200;
final ChunkInputStream stream = throwingChunkInputStream(ex, len, true);
- when(refreshPipeline.apply(blockID))
- .thenReturn(newPipeline);
+ when(refreshFunction.apply(blockID))
+ .thenReturn(blockLocationInfo);
+ when(blockLocationInfo.getPipeline()).thenReturn(newPipeline);
BlockInputStream subject = new BlockInputStream(blockID, blockSize,
- pipeline, null, false, clientFactory, refreshPipeline) {
+ pipeline, null, false, clientFactory, refreshFunction) {
@Override
protected List<ChunkInfo> getChunkInfos() throws IOException {
acquireClient();
@@ -429,7 +443,7 @@ public class TestBlockInputStream {
// THEN
Assert.assertEquals(len, bytesRead);
- verify(refreshPipeline).apply(blockID);
+ verify(refreshFunction).apply(blockID);
verify(clientFactory).acquireClientForReadData(pipeline);
verify(clientFactory).releaseClientForReadData(client, false);
} finally {
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
index f4d40c811c..0fe5886f1b 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/ECStreamTestUtil.java
@@ -258,7 +258,7 @@ public final class ECStreamTestUtil {
BlockLocationInfo blockInfo, Pipeline pipeline,
Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction) {
int repInd = currentPipeline.getReplicaIndex(pipeline.getNodes().get(0));
TestBlockInputStream stream = new TestBlockInputStream(
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
index 07c6b39234..caa071b1b9 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStream.java
@@ -462,18 +462,26 @@ public class TestECBlockInputStream {
}
// Create a refreshFunction that returns a hard-coded EC pipeline.
- Function<BlockID, Pipeline> refreshFunction = blkID ->
Pipeline.newBuilder()
- .setReplicationConfig(repConfig)
- .setNodes(new ArrayList<>(dnMap.keySet()))
- .setReplicaIndexes(dnMap)
- .setState(Pipeline.PipelineState.CLOSED)
- .setId(PipelineID.randomId())
- .build();
+ Function<BlockID, BlockLocationInfo> refreshFunction = blkID -> {
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setReplicationConfig(repConfig)
+ .setNodes(new ArrayList<>(dnMap.keySet()))
+ .setReplicaIndexes(dnMap)
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .build();
+ BlockLocationInfo blockLocation = new BlockLocationInfo.Builder()
+ .setPipeline(pipeline)
+ .build();
+ return blockLocation;
+ };
try (ECBlockInputStream ecb = new ECBlockInputStream(repConfig,
keyInfo, true, null, null, streamFactory)) {
Pipeline pipeline =
- ecb.ecPipelineRefreshFunction(3, refreshFunction).apply(blockID);
+ ecb.ecPipelineRefreshFunction(3, refreshFunction)
+ .apply(blockID)
+ .getPipeline();
// Check the pipeline is built with the correct Datanode
// with right replicaIndex.
Assertions.assertEquals(HddsProtos.ReplicationType.STAND_ALONE,
@@ -503,7 +511,7 @@ public class TestECBlockInputStream {
ReplicationConfig repConfig, BlockLocationInfo blockInfo,
Pipeline pipeline, Token<OzoneBlockTokenIdentifier> token,
boolean verifyChecksum, XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction) {
TestBlockInputStream stream = new TestBlockInputStream(
blockInfo.getBlockID(), blockInfo.getLength(),
(byte)blockStreams.size());
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
index 89ac7a831e..929fa13042 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/ozone/client/io/TestECBlockInputStreamProxy.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.junit.jupiter.api.Assertions;
@@ -378,7 +377,7 @@ public class TestECBlockInputStreamProxy {
List<DatanodeDetails> failedDatanodes,
ReplicationConfig repConfig, BlockLocationInfo blockInfo,
boolean verifyChecksum, XceiverClientFactory xceiverFactory,
- Function<BlockID, Pipeline> refreshFunction) {
+ Function<BlockID, BlockLocationInfo> refreshFunction) {
this.failedLocations = failedDatanodes;
ByteBuffer wrappedBuffer =
ByteBuffer.wrap(data.array(), 0, data.capacity());
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index ce068f1b36..91d4b94404 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -26,7 +26,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
@@ -85,14 +84,16 @@ public class KeyInputStream extends MultipartInputStream {
xceiverClientFactory,
keyBlockID -> {
OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo);
- return getPipeline(newKeyInfo, omKeyLocationInfo.getBlockID());
+ return getBlockLocationInfo(newKeyInfo,
+ omKeyLocationInfo.getBlockID());
});
partStreams.add(stream);
}
return partStreams;
}
- private static Pipeline getPipeline(OmKeyInfo newKeyInfo, BlockID blockID) {
+ private static BlockLocationInfo getBlockLocationInfo(OmKeyInfo newKeyInfo,
+ BlockID blockID) {
List<OmKeyLocationInfo> collect =
newKeyInfo.getLatestVersionLocations()
.getLocationList()
@@ -100,7 +101,7 @@ public class KeyInputStream extends MultipartInputStream {
.filter(l -> l.getBlockID().equals(blockID))
.collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(collect)) {
- return collect.get(0).getPipeline();
+ return collect.get(0);
} else {
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]