Cyrill commented on code in PR #4041:
URL: https://github.com/apache/ozone/pull/4041#discussion_r1045290160


##########
hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java:
##########
@@ -136,288 +155,41 @@ public static List<LengthInputStream> 
getStreamsFromKeyInfo(OmKeyInfo keyInfo,
     // Create a KeyInputStream for each part.
     for (Map.Entry<Integer, List<OmKeyLocationInfo>> entry :
         partsToBlocksMap.entrySet()) {
-      KeyInputStream keyInputStream = new KeyInputStream();
-      keyInputStream.initialize(keyInfo, entry.getValue(),
-          xceiverClientFactory, verifyChecksum, retryFunction,
+      List<BlockExtendedInputStream> streams = createStreams(keyInfo,
+          entry.getValue(), xceiverClientFactory, verifyChecksum, 
retryFunction,
           blockStreamFactory);
+      KeyInputStream keyInputStream =
+          new KeyInputStream(keyInfo.getKeyName(), streams);
       lengthInputStreams.add(new LengthInputStream(keyInputStream,
           partsLengthMap.get(entry.getKey())));
     }
 
     return lengthInputStreams;
   }
 
-  private synchronized void initialize(OmKeyInfo keyInfo,
-      List<OmKeyLocationInfo> blockInfos,
-      XceiverClientFactory xceiverClientFactory,
-      boolean verifyChecksum,  Function<OmKeyInfo, OmKeyInfo> retryFunction,
-      BlockInputStreamFactory blockStreamFactory) {
-    this.key = keyInfo.getKeyName();
-    this.blockOffsets = new long[blockInfos.size()];
-    long keyLength = 0;
-    for (int i = 0; i < blockInfos.size(); i++) {
-      OmKeyLocationInfo omKeyLocationInfo = blockInfos.get(i);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding stream for accessing {}. The stream will be " +
-            "initialized later.", omKeyLocationInfo);
-      }
-
-      // We also pass in functional reference which is used to refresh the
-      // pipeline info for a given OM Key location info.
-      addStream(keyInfo.getReplicationConfig(), omKeyLocationInfo,
-          xceiverClientFactory,
-          verifyChecksum, keyLocationInfo -> {
-            OmKeyInfo newKeyInfo = retryFunction.apply(keyInfo);
-            BlockID blockID = keyLocationInfo.getBlockID();
-            List<OmKeyLocationInfo> collect =
-                newKeyInfo.getLatestVersionLocations()
-                .getLocationList()
-                .stream()
-                .filter(l -> l.getBlockID().equals(blockID))
-                .collect(Collectors.toList());
-            if (CollectionUtils.isNotEmpty(collect)) {
-              return collect.get(0).getPipeline();
-            } else {
-              return null;
-            }
-          }, blockStreamFactory);
-
-      this.blockOffsets[i] = keyLength;
-      keyLength += omKeyLocationInfo.getLength();
-    }
-    this.length = keyLength;
-  }
-
-  /**
-   * Append another BlockInputStream to the end of the list. Note that the
-   * BlockInputStream is only created here and not initialized. The
-   * BlockInputStream is initialized when a read operation is performed on
-   * the block for the first time.
-   */
-  private synchronized void addStream(ReplicationConfig repConfig,
-      OmKeyLocationInfo blockInfo,
-      XceiverClientFactory xceiverClientFactory, boolean verifyChecksum,
-      Function<OmKeyLocationInfo, Pipeline> refreshPipelineFunction,
-      BlockInputStreamFactory blockStreamFactory) {
-    blockStreams.add(blockStreamFactory.create(repConfig, blockInfo,
-        blockInfo.getPipeline(), blockInfo.getToken(),
-        verifyChecksum, xceiverClientFactory,
-        blockID -> refreshPipelineFunction.apply(blockInfo)));
-  }
-
-  @VisibleForTesting
-  public void addStream(BlockInputStream blockInputStream) {
-    blockStreams.add(blockInputStream);
-  }
-
-  @VisibleForTesting
-  public void addStream(BlockExtendedInputStream blockInputStream) {
-    blockStreams.add(blockInputStream);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public synchronized int read() throws IOException {
-    byte[] buf = new byte[1];
-    if (read(buf, 0, 1) == EOF) {
-      return EOF;
-    }
-    return Byte.toUnsignedInt(buf[0]);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public synchronized int read(byte[] b, int off, int len) throws IOException {
-    ByteReaderStrategy strategy = new ByteArrayReader(b, off, len);
-    int bufferLen = strategy.getTargetLength();
-    if (bufferLen == 0) {
-      return 0;
-    }
-    return readWithStrategy(strategy);
-  }
-
   @Override
-  public synchronized int read(ByteBuffer byteBuffer) throws IOException {
-    ByteReaderStrategy strategy = new ByteBufferReader(byteBuffer);
-    int bufferLen = strategy.getTargetLength();
-    if (bufferLen == 0) {
-      return 0;
-    }
-    return readWithStrategy(strategy);
-  }
-
-  @Override
-  protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
-      throws IOException {
-    Preconditions.checkArgument(strategy != null);
-    checkOpen();
-
-    int buffLen = strategy.getTargetLength();
-    int totalReadLen = 0;
-    while (buffLen > 0) {
-      // if we are at the last block and have read the entire block, return
-      if (blockStreams.size() == 0 ||
-          (blockStreams.size() - 1 <= blockIndex &&
-              blockStreams.get(blockIndex)
-                  .getRemaining() == 0)) {
-        return totalReadLen == 0 ? EOF : totalReadLen;
-      }
-
-      // Get the current blockStream and read data from it
-      BlockExtendedInputStream current = blockStreams.get(blockIndex);
-      int numBytesToRead = (int)Math.min(buffLen, current.getRemaining());
-      int numBytesRead = strategy.readFromBlock(current, numBytesToRead);
-      if (numBytesRead != numBytesToRead) {
-        // This implies that there is either data loss or corruption in the
-        // chunk entries. Even EOF in the current stream would be covered in
-        // this case.
-        throw new IOException(String.format("Inconsistent read for blockID=%s "
-                + "length=%d numBytesToRead=%d numBytesRead=%d",
-            current.getBlockID(), current.getLength(), numBytesToRead,
-            numBytesRead));
-      }
-      totalReadLen += numBytesRead;
-      buffLen -= numBytesRead;
-      if (current.getRemaining() <= 0 &&
-          ((blockIndex + 1) < blockStreams.size())) {
-        blockIndex += 1;
-      }
-    }
-    return totalReadLen;
+  protected int getNumBytesToRead(ByteReaderStrategy strategy,
+                                  PartInputStream current) throws IOException {
+    return (int) Math.min(strategy.getTargetLength(), current.getRemaining());
   }
 
-  /**
-   * Seeks the KeyInputStream to the specified position. This involves 2 steps:
-   *    1. Updating the blockIndex to the blockStream corresponding to the
-   *    seeked position.
-   *    2. Seeking the corresponding blockStream to the adjusted position.
-   *
-   * For example, let’s say the block size is 200 bytes and block[0] stores
-   * data from indices 0 - 199, block[1] from indices 200 - 399 and so on.
-   * Let’s say we seek to position 240. In the first step, the blockIndex
-   * would be updated to 1 as indices 200 - 399 reside in blockStream[1]. In
-   * the second step, the blockStream[1] would be seeked to position 40 (=
-   * 240 - blockOffset[1] (= 200)).
-   */
   @Override
-  public synchronized void seek(long pos) throws IOException {
-    checkOpen();
-    if (pos == 0 && length == 0) {
-      // It is possible for length and pos to be zero in which case
-      // seek should return instead of throwing exception
-      return;
+  protected void checkPartBytesRead(int numBytesToRead, int numBytesRead,
+                                    PartInputStream stream) throws IOException 
{
+    if (numBytesRead != numBytesToRead) {
+      // This implies that there is either data loss or corruption in the
+      // chunk entries. Even EOF in the current stream would be covered in
+      // this case.
+      throw new IOException(String.format("Inconsistent read for blockID=%s "

Review Comment:
   Well, the offset you are referring to is actually stream.getPos(). I made 
the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to