[ 
https://issues.apache.org/jira/browse/HDDS-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mark Gui updated HDDS-6368:
---------------------------
    Description: 
Observed a broken future chain long lives in the current code:
{code:java}
// BlockOutputStream.java
CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
    ChunkBuffer chunk) throws IOException {
  int effectiveChunkSize = chunk.remaining();
  final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
  final ByteString data = chunk.toByteString(
      bufferPool.byteStringConversion());
  ChecksumData checksumData = checksum.computeChecksum(chunk);
  ChunkInfo chunkInfo = ChunkInfo.newBuilder()
      .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
      .setOffset(offset)
      .setLen(effectiveChunkSize)
      .setChecksumData(checksumData.getProtoBufMessage())
      .build();

  if (LOG.isDebugEnabled()) {
    LOG.debug("Writing chunk {} length {} at offset {}",
        chunkInfo.getChunkName(), effectiveChunkSize, offset);
  }

  try {
    XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
        blockID.get(), data, token, replicationIndex);
    CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
        asyncReply.getResponse();
    future.thenApplyAsync(e -> {                    // <-- the new stage is not 
holded and returned
      try {
        validateResponse(e);
      } catch (IOException sce) {
        future.completeExceptionally(sce);
      }
      return e;
    }, responseExecutor).exceptionally(e -> {
      String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " +
          "into block " + blockID;
      LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
      CompletionException ce = new CompletionException(msg, e);
      setIoException(ce);
      throw ce;
    });
    containerBlockData.addChunks(chunkInfo);
    return future;                                  // <-- actually returning a 
future in the middle
  } catch (IOException | ExecutionException e) {
    throw new IOException(EXCEPTION_MSG + e.toString(), e);
  } catch (InterruptedException ex) {
    Thread.currentThread().interrupt();
    handleInterruptedException(ex, false);
  }
  return null;
} {code}
In Ratis path, the future is not used, so there is no problem;

In EC path, the future is waited below, so there is a problem.
{code:java}
// ECBlockOutputStreamEntry.java
private boolean isFailed(
    ECBlockOutputStream outputStream,
    CompletableFuture<ContainerProtos.
        ContainerCommandResponseProto> chunkWriteResponseFuture) {
  ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
      = null;
  try {
    containerCommandResponseProto = chunkWriteResponseFuture != null ?
        chunkWriteResponseFuture.get() :                                       
<-- wait for the future
        null;
  } catch (InterruptedException e) {
    outputStream.setIoException(e);
    Thread.currentThread().interrupt();
  } catch (ExecutionException e) {
    outputStream.setIoException(e);
  }

  if ((outputStream != null && containerCommandResponseProto != null)
      && (outputStream.getIoException() != null || isStreamFailed(
      containerCommandResponseProto, outputStream))) {
    return true;
  }
  return false;
} {code}
Since `validateResponse` is already executed in the lambda in the 1st piece, 
the `isStreamFailed` is not needed anymore, just checking the already set 
IoException is ok.

Note that `executePutBlock` has the correct future chain and have 
`validateResponse` called, too.

 

 

 

> EC: Fix broken future chain and cleanup unnecessary validation function.
> ------------------------------------------------------------------------
>
>                 Key: HDDS-6368
>                 URL: https://issues.apache.org/jira/browse/HDDS-6368
>             Project: Apache Ozone
>          Issue Type: Sub-task
>            Reporter: Mark Gui
>            Assignee: Mark Gui
>            Priority: Minor
>
> Observed a broken future chain long lives in the current code:
> {code:java}
> // BlockOutputStream.java
> CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
>     ChunkBuffer chunk) throws IOException {
>   int effectiveChunkSize = chunk.remaining();
>   final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
>   final ByteString data = chunk.toByteString(
>       bufferPool.byteStringConversion());
>   ChecksumData checksumData = checksum.computeChecksum(chunk);
>   ChunkInfo chunkInfo = ChunkInfo.newBuilder()
>       .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
>       .setOffset(offset)
>       .setLen(effectiveChunkSize)
>       .setChecksumData(checksumData.getProtoBufMessage())
>       .build();
>   if (LOG.isDebugEnabled()) {
>     LOG.debug("Writing chunk {} length {} at offset {}",
>         chunkInfo.getChunkName(), effectiveChunkSize, offset);
>   }
>   try {
>     XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
>         blockID.get(), data, token, replicationIndex);
>     CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
>         asyncReply.getResponse();
>     future.thenApplyAsync(e -> {                    // <-- the new stage is 
> not holded and returned
>       try {
>         validateResponse(e);
>       } catch (IOException sce) {
>         future.completeExceptionally(sce);
>       }
>       return e;
>     }, responseExecutor).exceptionally(e -> {
>       String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " +
>           "into block " + blockID;
>       LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
>       CompletionException ce = new CompletionException(msg, e);
>       setIoException(ce);
>       throw ce;
>     });
>     containerBlockData.addChunks(chunkInfo);
>     return future;                                  // <-- actually returning 
> a future in the middle
>   } catch (IOException | ExecutionException e) {
>     throw new IOException(EXCEPTION_MSG + e.toString(), e);
>   } catch (InterruptedException ex) {
>     Thread.currentThread().interrupt();
>     handleInterruptedException(ex, false);
>   }
>   return null;
> } {code}
> In Ratis path, the future is not used, so there is no problem;
> In EC path, the future is waited below, so there is a problem.
> {code:java}
> // ECBlockOutputStreamEntry.java
> private boolean isFailed(
>     ECBlockOutputStream outputStream,
>     CompletableFuture<ContainerProtos.
>         ContainerCommandResponseProto> chunkWriteResponseFuture) {
>   ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
>       = null;
>   try {
>     containerCommandResponseProto = chunkWriteResponseFuture != null ?
>         chunkWriteResponseFuture.get() :                                      
>  <-- wait for the future
>         null;
>   } catch (InterruptedException e) {
>     outputStream.setIoException(e);
>     Thread.currentThread().interrupt();
>   } catch (ExecutionException e) {
>     outputStream.setIoException(e);
>   }
>   if ((outputStream != null && containerCommandResponseProto != null)
>       && (outputStream.getIoException() != null || isStreamFailed(
>       containerCommandResponseProto, outputStream))) {
>     return true;
>   }
>   return false;
> } {code}
> Since `validateResponse` is already executed in the lambda in the 1st piece, 
> the `isStreamFailed` is not needed anymore, just checking the already set 
> IoException is ok.
> Note that `executePutBlock` has the correct future chain and have 
> `validateResponse` called, too.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to