[
https://issues.apache.org/jira/browse/HDDS-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Mark Gui updated HDDS-6368:
---------------------------
Description:
Observed a broken future chain long lives in the current code:
{code:java}
// BlockOutputStream.java
CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
ChunkBuffer chunk) throws IOException {
int effectiveChunkSize = chunk.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
final ByteString data = chunk.toByteString(
bufferPool.byteStringConversion());
ChecksumData checksumData = checksum.computeChecksum(chunk);
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
.setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
.setOffset(offset)
.setLen(effectiveChunkSize)
.setChecksumData(checksumData.getProtoBufMessage())
.build();
if (LOG.isDebugEnabled()) {
LOG.debug("Writing chunk {} length {} at offset {}",
chunkInfo.getChunkName(), effectiveChunkSize, offset);
}
try {
XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, token, replicationIndex);
CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
asyncReply.getResponse();
future.thenApplyAsync(e -> { // <-- the new stage is not
holded and returned
try {
validateResponse(e);
} catch (IOException sce) {
future.completeExceptionally(sce);
}
return e;
}, responseExecutor).exceptionally(e -> {
String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " +
"into block " + blockID;
LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
CompletionException ce = new CompletionException(msg, e);
setIoException(ce);
throw ce;
});
containerBlockData.addChunks(chunkInfo);
return future; // <-- actually returning a
future in the middle
} catch (IOException | ExecutionException e) {
throw new IOException(EXCEPTION_MSG + e.toString(), e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
}
return null;
} {code}
In Ratis path, the future is not used, so there is no problem;
In EC path, the future is waited below, so there is a problem.
{code:java}
// ECBlockOutputStreamEntry.java
private boolean isFailed(
ECBlockOutputStream outputStream,
CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> chunkWriteResponseFuture) {
ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
= null;
try {
containerCommandResponseProto = chunkWriteResponseFuture != null ?
chunkWriteResponseFuture.get() :
<-- wait for the future
null;
} catch (InterruptedException e) {
outputStream.setIoException(e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
outputStream.setIoException(e);
}
if ((outputStream != null && containerCommandResponseProto != null)
&& (outputStream.getIoException() != null || isStreamFailed(
containerCommandResponseProto, outputStream))) {
return true;
}
return false;
} {code}
Since `validateResponse` is already executed in the lambda in the 1st piece,
the `isStreamFailed` is not needed anymore, just checking the already set
IoException is ok.
Note that `executePutBlock` has the correct future chain and have
`validateResponse` called, too.
> EC: Fix broken future chain and cleanup unnecessary validation function.
> ------------------------------------------------------------------------
>
> Key: HDDS-6368
> URL: https://issues.apache.org/jira/browse/HDDS-6368
> Project: Apache Ozone
> Issue Type: Sub-task
> Reporter: Mark Gui
> Assignee: Mark Gui
> Priority: Minor
>
> Observed a broken future chain long lives in the current code:
> {code:java}
> // BlockOutputStream.java
> CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
> ChunkBuffer chunk) throws IOException {
> int effectiveChunkSize = chunk.remaining();
> final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
> final ByteString data = chunk.toByteString(
> bufferPool.byteStringConversion());
> ChecksumData checksumData = checksum.computeChecksum(chunk);
> ChunkInfo chunkInfo = ChunkInfo.newBuilder()
> .setChunkName(blockID.get().getLocalID() + "_chunk_" + ++chunkIndex)
> .setOffset(offset)
> .setLen(effectiveChunkSize)
> .setChecksumData(checksumData.getProtoBufMessage())
> .build();
> if (LOG.isDebugEnabled()) {
> LOG.debug("Writing chunk {} length {} at offset {}",
> chunkInfo.getChunkName(), effectiveChunkSize, offset);
> }
> try {
> XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
> blockID.get(), data, token, replicationIndex);
> CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
> asyncReply.getResponse();
> future.thenApplyAsync(e -> { // <-- the new stage is
> not holded and returned
> try {
> validateResponse(e);
> } catch (IOException sce) {
> future.completeExceptionally(sce);
> }
> return e;
> }, responseExecutor).exceptionally(e -> {
> String msg = "Failed to write chunk " + chunkInfo.getChunkName() + " " +
> "into block " + blockID;
> LOG.debug("{}, exception: {}", msg, e.getLocalizedMessage());
> CompletionException ce = new CompletionException(msg, e);
> setIoException(ce);
> throw ce;
> });
> containerBlockData.addChunks(chunkInfo);
> return future; // <-- actually returning
> a future in the middle
> } catch (IOException | ExecutionException e) {
> throw new IOException(EXCEPTION_MSG + e.toString(), e);
> } catch (InterruptedException ex) {
> Thread.currentThread().interrupt();
> handleInterruptedException(ex, false);
> }
> return null;
> } {code}
> In Ratis path, the future is not used, so there is no problem;
> In EC path, the future is waited below, so there is a problem.
> {code:java}
> // ECBlockOutputStreamEntry.java
> private boolean isFailed(
> ECBlockOutputStream outputStream,
> CompletableFuture<ContainerProtos.
> ContainerCommandResponseProto> chunkWriteResponseFuture) {
> ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto
> = null;
> try {
> containerCommandResponseProto = chunkWriteResponseFuture != null ?
> chunkWriteResponseFuture.get() :
> <-- wait for the future
> null;
> } catch (InterruptedException e) {
> outputStream.setIoException(e);
> Thread.currentThread().interrupt();
> } catch (ExecutionException e) {
> outputStream.setIoException(e);
> }
> if ((outputStream != null && containerCommandResponseProto != null)
> && (outputStream.getIoException() != null || isStreamFailed(
> containerCommandResponseProto, outputStream))) {
> return true;
> }
> return false;
> } {code}
> Since `validateResponse` is already executed in the lambda in the 1st piece,
> the `isStreamFailed` is not needed anymore, just checking the already set
> IoException is ok.
> Note that `executePutBlock` has the correct future chain and have
> `validateResponse` called, too.
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]