[
https://issues.apache.org/jira/browse/HDDS-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated HDDS-6368:
---------------------------------
Labels: pull-request-available (was: )
> EC: Fix broken future chain and cleanup unnecessary validation function.
> ------------------------------------------------------------------------
>
> Key: HDDS-6368
> URL: https://issues.apache.org/jira/browse/HDDS-6368
> Project: Apache Ozone
> Issue Type: Sub-task
> Reporter: Mark Gui
> Assignee: Mark Gui
> Priority: Minor
> Labels: pull-request-available
>
> Observed a broken future chain long lives in the current code:
> {code:java}
> // BlockOutputStream.java
> CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
> ChunkBuffer chunk) throws IOException {
> int effectiveChunkSize = chunk.remaining();
> final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
> final ByteString data = chunk.toByteString(
> bufferPool.byteStringConversion());
> ChecksumData checksumData = checksum.computeChecksum(chunk);
> ChunkInfo chunkInfo = ChunkInfo.newBuilder()
> .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
> .setOffset(offset)
> .setLen(effectiveChunkSize)
> .setChecksumData(checksumData.getProtoBufMessage())
> .build();
> if (LOG.isDebugEnabled()) {
> LOG.debug("Writing chunk {} length {} at offset {}",
> chunkInfo.getChunkName(), effectiveChunkSize, offset);
> }
> try {
> XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
> blockID.get(), data, token, replicationIndex);
> CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
> asyncReply.getResponse();
> future.thenApplyAsync(e -> { // <-- the new stage is
> not holded and returned
> try {
> validateResponse(e);
> } catch (IOException sce) {
> future.completeExceptionally(sce);
> }
> return e;
> }, responseExecutor).exceptionally(e -> {
> String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " +
> "into block " + blockID;
> LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
> CompletionException ce = new CompletionException(msg, e);
> setIoException(ce);
> throw ce;
> });
> containerBlockData.addChunks(chunkInfo);
> return future; // <-- actually returning
> a future in the middle
> } catch (IOException | ExecutionException e) {
> throw new IOException(EXCEPTION_MSG + e.toString(), e);
> } catch (InterruptedException ex) {
> Thread.currentThread().interrupt();
> handleInterruptedException(ex, false);
> }
> return null;
> } {code}
> In Ratis path, the future is not used, so there is no problem;
> In EC path, the future is waited below, so there is a problem.
> {code:java}
> // ECBlockOutputStreamEntry.java
> private boolean isFailed(
> ECBlockOutputStream outputStream,
> CompletableFuture<ContainerProtos.
> ContainerCommandResponseProto> chunkWriteResponseFuture) {
> ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
> = null;
> try {
> containerCommandResponseProto = chunkWriteResponseFuture != null ?
> chunkWriteResponseFuture.get() :
> <-- wait for the future
> null;
> } catch (InterruptedException e) {
> outputStream.setIoException(e);
> Thread.currentThread().interrupt();
> } catch (ExecutionException e) {
> outputStream.setIoException(e);
> }
> if ((outputStream != null && containerCommandResponseProto != null)
> && (outputStream.getIoException() != null || isStreamFailed(
> containerCommandResponseProto, outputStream))) {
> return true;
> }
> return false;
> } {code}
> Since `validateResponse` is already executed in the lambda in the 1st piece,
> the `isStreamFailed` is not needed anymore, just checking the already set
> IoException is ok.
> Note that `executePutBlock` has the correct future chain and have
> `validateResponse` called, too.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]