[ 
https://issues.apache.org/jira/browse/HDDS-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HDDS-6368:
---------------------------------
    Labels: pull-request-available  (was: )

> EC: Fix broken future chain and cleanup unnecessary validation function.
> ------------------------------------------------------------------------
>
>                 Key: HDDS-6368
>                 URL: https://issues.apache.org/jira/browse/HDDS-6368
>             Project: Apache Ozone
>          Issue Type: Sub-task
>            Reporter: Mark Gui
>            Assignee: Mark Gui
>            Priority: Minor
>              Labels: pull-request-available
>
> Observed a broken future chain long lives in the current code:
> {code:java}
> // BlockOutputStream.java
> CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
>     ChunkBuffer chunk) throws IOException {
>   int effectiveChunkSize = chunk.remaining();
>   final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
>   final ByteString data = chunk.toByteString(
>       bufferPool.byteStringConversion());
>   ChecksumData checksumData = checksum.computeChecksum(chunk);
>   ChunkInfo chunkInfo = ChunkInfo.newBuilder()
>       .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
>       .setOffset(offset)
>       .setLen(effectiveChunkSize)
>       .setChecksumData(checksumData.getProtoBufMessage())
>       .build();
>   if (LOG.isDebugEnabled()) {
>     LOG.debug("Writing chunk {} length {} at offset {}",
>         chunkInfo.getChunkName(), effectiveChunkSize, offset);
>   }
>   try {
>     XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
>         blockID.get(), data, token, replicationIndex);
>     CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
>         asyncReply.getResponse();
>     future.thenApplyAsync(e -> {                    // <-- the new stage is 
> not holded and returned
>       try {
>         validateResponse(e);
>       } catch (IOException sce) {
>         future.completeExceptionally(sce);
>       }
>       return e;
>     }, responseExecutor).exceptionally(e -> {
>       String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " +
>           "into block " + blockID;
>       LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
>       CompletionException ce = new CompletionException(msg, e);
>       setIoException(ce);
>       throw ce;
>     });
>     containerBlockData.addChunks(chunkInfo);
>     return future;                                  // <-- actually returning 
> a future in the middle
>   } catch (IOException | ExecutionException e) {
>     throw new IOException(EXCEPTION_MSG + e.toString(), e);
>   } catch (InterruptedException ex) {
>     Thread.currentThread().interrupt();
>     handleInterruptedException(ex, false);
>   }
>   return null;
> } {code}
> In Ratis path, the future is not used, so there is no problem;
> In EC path, the future is waited below, so there is a problem.
> {code:java}
> // ECBlockOutputStreamEntry.java
> private boolean isFailed(
>     ECBlockOutputStream outputStream,
>     CompletableFuture<ContainerProtos.
>         ContainerCommandResponseProto> chunkWriteResponseFuture) {
>   ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
>       = null;
>   try {
>     containerCommandResponseProto = chunkWriteResponseFuture != null ?
>         chunkWriteResponseFuture.get() :                                      
>  <-- wait for the future
>         null;
>   } catch (InterruptedException e) {
>     outputStream.setIoException(e);
>     Thread.currentThread().interrupt();
>   } catch (ExecutionException e) {
>     outputStream.setIoException(e);
>   }
>   if ((outputStream != null && containerCommandResponseProto != null)
>       && (outputStream.getIoException() != null || isStreamFailed(
>       containerCommandResponseProto, outputStream))) {
>     return true;
>   }
>   return false;
> } {code}
> Since `validateResponse` is already executed in the lambda in the 1st piece, 
> the `isStreamFailed` is not needed anymore, just checking the already set 
> IoException is ok.
> Note that `executePutBlock` has the correct future chain and have 
> `validateResponse` called, too.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to