adoroszlai commented on a change in pull request #2337:
URL: https://github.com/apache/ozone/pull/2337#discussion_r655872160



##########
File path: 
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
##########
@@ -1315,7 +1315,7 @@ private OzoneInputStream createInputStream(
     if (feInfo == null) {
       LengthInputStream lengthInputStream = KeyInputStream
           .getFromOmKeyInfo(keyInfo, xceiverClientManager,
-              clientConfig.isChecksumVerify(), retryFunction);
+              clientConfig.isChecksumVerify(), retryFunction, clientConfig);

Review comment:
       Parameter `verifyChecksum` could be removed by getting it via 
`clientConfig.isChecksumVerify()` only in `BlockInputStream` constructor.

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
##########
@@ -109,45 +111,83 @@
   private int chunkIndexOfPrevPosition;
 
   private final Function<BlockID, Pipeline> refreshPipelineFunction;
+  private boolean smallBlock = false;
+  private final OzoneClientConfig clientConfig;
 
+  @SuppressWarnings("parameternumber")
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory,
-      Function<BlockID, Pipeline> refreshPipelineFunction) {
+      Function<BlockID, Pipeline> refreshPipelineFunction,
+      OzoneClientConfig clientConfig) {
     this.blockID = blockId;
     this.length = blockLen;
     this.pipeline = pipeline;
     this.token = token;
     this.verifyChecksum = verifyChecksum;
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshPipelineFunction = refreshPipelineFunction;
+    if (clientConfig != null) {
+      this.smallBlock = (length <= clientConfig.getSmallBlockThreshold());
+    }
+    this.clientConfig = clientConfig;
+  }
+
+  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, Pipeline> refreshPipelineFunction) {
+    this(blockId, blockLen, pipeline, token, verifyChecksum,
+        xceiverClientFactory, refreshPipelineFunction, null);
   }
 
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
-                          Token<OzoneBlockTokenIdentifier> token,
-                          boolean verifyChecksum,
-                          XceiverClientFactory xceiverClientFactory
-  ) {
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory) {
     this(blockId, blockLen, pipeline, token, verifyChecksum,
-        xceiverClientFactory, null);
+        xceiverClientFactory, null, null);

Review comment:
       Can we pass `new OzoneClientConfig()` instead of `null`?  This way we 
can avoid null checks and use the default configuration seamlessly.

##########
File path: 
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
##########
@@ -109,45 +111,83 @@
   private int chunkIndexOfPrevPosition;
 
   private final Function<BlockID, Pipeline> refreshPipelineFunction;
+  private boolean smallBlock = false;
+  private final OzoneClientConfig clientConfig;
 
+  @SuppressWarnings("parameternumber")
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
       Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
       XceiverClientFactory xceiverClientFactory,
-      Function<BlockID, Pipeline> refreshPipelineFunction) {
+      Function<BlockID, Pipeline> refreshPipelineFunction,
+      OzoneClientConfig clientConfig) {
     this.blockID = blockId;
     this.length = blockLen;
     this.pipeline = pipeline;
     this.token = token;
     this.verifyChecksum = verifyChecksum;
     this.xceiverClientFactory = xceiverClientFactory;
     this.refreshPipelineFunction = refreshPipelineFunction;
+    if (clientConfig != null) {
+      this.smallBlock = (length <= clientConfig.getSmallBlockThreshold());
+    }
+    this.clientConfig = clientConfig;
+  }
+
+  public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory,
+      Function<BlockID, Pipeline> refreshPipelineFunction) {
+    this(blockId, blockLen, pipeline, token, verifyChecksum,
+        xceiverClientFactory, refreshPipelineFunction, null);
   }
 
   public BlockInputStream(BlockID blockId, long blockLen, Pipeline pipeline,
-                          Token<OzoneBlockTokenIdentifier> token,
-                          boolean verifyChecksum,
-                          XceiverClientFactory xceiverClientFactory
-  ) {
+      Token<OzoneBlockTokenIdentifier> token, boolean verifyChecksum,
+      XceiverClientFactory xceiverClientFactory) {
     this(blockId, blockLen, pipeline, token, verifyChecksum,
-        xceiverClientFactory, null);
+        xceiverClientFactory, null, null);
   }
   /**
    * Initialize the BlockInputStream. Get the BlockData (list of chunks) from
    * the Container and create the ChunkInputStreams for each Chunk in the 
Block.
    */
   public synchronized void initialize() throws IOException {
-
-    // Pre-check that the stream has not been intialized already
+    // Pre-check that the stream has not been initialized already
     if (initialized) {
       return;
     }
 
+    // Initialize pipeline and client.
+    // irrespective of the container state, we will always read via Standalone
+    // protocol.
+    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+      pipeline = Pipeline.newBuilder(pipeline)
+          .setReplicationConfig(new StandaloneReplicationConfig(
+              ReplicationConfig
+                  .getLegacyFactor(pipeline.getReplicationConfig())))
+          .build();
+    }
+    acquireClient();
+
     List<ChunkInfo> chunks;
-    try {
-      chunks = getChunkInfos();
-    } catch (ContainerNotFoundException ioEx) {
-      refreshPipeline(ioEx);
-      chunks = getChunkInfos();
+    if (smallBlock) {
+      ChunkInfo chunkInfo = ChunkInfo.newBuilder()
+          .setChunkName(blockID.getLocalID() + "_chunk_" + (chunkIndex + 1))
+          .setOffset(0L).setLen(length)
+          .setChecksumData(ContainerProtos.ChecksumData.newBuilder()
+              .setBytesPerChecksum(clientConfig.getBytesPerChecksum())
+              .setType(clientConfig.getChecksumType())
+              .build())
+          .build();
+      chunks = new ArrayList<>();
+      chunks.add(chunkInfo);
+    } else {
+      try {
+        chunks = getChunkInfos();
+      } catch (ContainerNotFoundException ioEx) {
+        refreshPipeline(ioEx);

Review comment:
       `refreshPipeline` may update the pipeline, but client still references 
the old one.  New one may need to be acquired.  Also, new pipeline needs to be 
converted to standalone, too.

##########
File path: 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
##########
@@ -85,12 +87,22 @@
   public Timeout timeout = Timeout.seconds(300);
 
   @Parameterized.Parameters
-  public static Iterable<Object[]> parameters() {
-    return ChunkLayoutTestInfo.chunkLayoutParameters();
+  public static Collection<Object[]> layouts() {
+    return Arrays.asList(new Object[][] {
+        {ChunkLayOutVersion.FILE_PER_CHUNK, 0},
+        {ChunkLayOutVersion.FILE_PER_CHUNK, BYTES_PER_CHECKSUM},
+        {ChunkLayOutVersion.FILE_PER_CHUNK, CHUNK_SIZE},
+        {ChunkLayOutVersion.FILE_PER_CHUNK, BLOCK_SIZE},
+        {ChunkLayOutVersion.FILE_PER_BLOCK, 0},
+        {ChunkLayOutVersion.FILE_PER_BLOCK, BYTES_PER_CHECKSUM},
+        {ChunkLayOutVersion.FILE_PER_BLOCK, CHUNK_SIZE},
+        {ChunkLayOutVersion.FILE_PER_BLOCK, BLOCK_SIZE}

Review comment:
       This increases integration test run time significantly (almost *an hour* 
for only `TestChunkInputStream` and `TestKeyInputStream` on my machine).  Are 
all these combinations really necessary?  Do all test cases depend on the value 
of `blockThreshold`?
   
   Nit: manually enumerating the parameter matrix seems too verbose.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to