yandrey321 commented on code in PR #6613:
URL: https://github.com/apache/ozone/pull/6613#discussion_r2505710612


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -2046,6 +2055,86 @@ public void deleteUnreferenced(Container container, long 
localID)
     }
   }
 
+  @Override
+  public ContainerCommandResponseProto readBlock(
+      ContainerCommandRequestProto request, Container kvContainer,
+      DispatcherContext dispatcherContext,
+      StreamObserver<ContainerCommandResponseProto> streamObserver) {
+
+    if (kvContainer.getContainerData().getLayoutVersion() != FILE_PER_BLOCK) {
+      return ContainerUtils.logAndReturnError(LOG,
+          new StorageContainerException("Only File Per Block is supported", 
IO_EXCEPTION), request);
+    }
+
+    ContainerCommandResponseProto responseProto = null;
+    if (!request.hasReadBlock()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Malformed Read Block request. trace ID: {}", 
request.getTraceID());
+      }
+      return malformedRequest(request);
+    }
+    try {
+      ReadBlockRequestProto readBlock = request.getReadBlock();
+
+      BlockID blockID = BlockID.getFromProtobuf(readBlock.getBlockID());
+      // This is a new api the block should always be checked.
+      BlockUtils.verifyReplicaIdx(kvContainer, blockID);
+      BlockUtils.verifyBCSId(kvContainer, blockID);
+
+      File blockFile = 
FILE_PER_BLOCK.getChunkFile(kvContainer.getContainerData(), blockID, "unused");
+
+      BlockData blockData = getBlockManager().getBlock(kvContainer, blockID);
+      List<ContainerProtos.ChunkInfo> chunkInfos = blockData.getChunks();
+      // To get the chunksize, check the first chunk. Either there is only 1 
chunk and its the largest, or there are
+      // multiple chunks and they are all the same size except the last one.
+      long bytesPerChunk = chunkInfos.get(0).getLen();
+      // The bytes per checksum is stored in the checksum data of each chunk, 
so check the first chunk as they all
+      // must be the same.
+      ContainerProtos.ChecksumType checksumType = 
chunkInfos.get(0).getChecksumData().getType();
+      ChecksumData checksumData = null;
+      int bytesPerChecksum = STREAMING_BYTES_PER_CHUNK;
+      if (checksumType == ContainerProtos.ChecksumType.NONE) {
+        checksumData = new ChecksumData(checksumType, 0);
+      } else {
+        bytesPerChecksum = 
chunkInfos.get(0).getChecksumData().getBytesPerChecksum();
+      }
+      // We have to align the read to checksum boundaries, so whatever offset 
is requested, we have to move back to the
+      // previous checksum boundary.
+      // eg if bytesPerChecksum is 512, and the requested offset is 600, we 
have to move back to 512.
+      // If the checksum type is NONE, we don't have to do this, but using no 
checksums should be rare in practice and
+      // it simplifies the code to always do this.
+      long adjustedOffset = readBlock.getOffset() - readBlock.getOffset() % 
bytesPerChecksum;
+      try (RandomAccessFile file = new RandomAccessFile(blockFile, "r");
+           FileChannel channel = file.getChannel()) {
+        ByteBuffer buffer = ByteBuffer.allocate(bytesPerChecksum);
+        channel.position(adjustedOffset);
+        while (channel.read(buffer) != -1) {
+          buffer.flip();
+          if (checksumType != ContainerProtos.ChecksumType.NONE) {
+            // As the checksums are stored "chunk by chunk", we need to figure 
out which chunk we start reading from,
+            // and its offset to pull out the correct checksum bytes for each 
read.
+            int chunkIndex = (int) (adjustedOffset / bytesPerChunk);
+            int chunkOffset = (int) (adjustedOffset % bytesPerChunk);
+            int checksumIndex = chunkOffset / bytesPerChecksum;
+            ByteString checksum = 
blockData.getChunks().get(chunkIndex).getChecksumData().getChecksums(checksumIndex);
+            checksumData = new ChecksumData(checksumType, bytesPerChecksum, 
Collections.singletonList(checksum));
+          }
+          streamObserver.onNext(getReadBlockResponse(request, checksumData, 
buffer, adjustedOffset));

Review Comment:
   here we need to check if streamObserver.isReady(), and if its not ready we 
should use 
[setOnReadyHandler](https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/ServerCallStreamObserver.html#setOnReadyHandler(java.lang.Runnable))​([Runnable](https://docs.oracle.com/javase/8/docs/api/java/lang/Runnable.html?is-external=true)
 onReadyHandler) for transferring the remaining data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to