This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 2aacc7a889aa49237296fcecbcf0c664f7d42000 Author: micah zhao <[email protected]> AuthorDate: Fri Dec 24 01:18:48 2021 +0800 HDDS-6130. [Ozone-Streaming] When releaseBuffers will get “Couldn 't find the required future” (#2939) --- .../hdds/scm/storage/BlockDataStreamOutput.java | 92 ++++++++++------------ .../hdds/scm/storage/StreamCommitWatcher.java | 28 ------- 2 files changed, 42 insertions(+), 78 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 9fb1340527..84968b68d5 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.hdds.scm.OzoneClientConfig; @@ -115,6 +116,9 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // Also, corresponding to the logIndex, the corresponding list of buffers will // be released from the buffer pool. private final StreamCommitWatcher commitWatcher; + private final AtomicReference<CompletableFuture< + ContainerCommandResponseProto>> putBlockFuture + = new AtomicReference<>(CompletableFuture.completedFuture(null)); private final List<DatanodeDetails> failedServers; private final Checksum checksum; @@ -381,8 +385,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { * @param force true if no data was written since most recent putBlock and * stream is being closed */ - private CompletableFuture<ContainerProtos. - ContainerCommandResponseProto> executePutBlock(boolean close, + private void executePutBlock(boolean close, boolean force) throws IOException { checkOpen(); long flushPos = totalDataFlushedLength; @@ -399,56 +402,54 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { dataStreamCloseReply = out.closeAsync(); } - CompletableFuture<ContainerProtos. - ContainerCommandResponseProto> flushFuture = null; try { BlockData blockData = containerBlockData.build(); XceiverClientReply asyncReply = putBlockAsync(xceiverClient, blockData, close, token); - CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future = - asyncReply.getResponse(); - flushFuture = future.thenApplyAsync(e -> { - try { - validateResponse(e); - } catch (IOException sce) { - throw new CompletionException(sce); - } - // if the ioException is not set, putBlock is successful - if (getIoException() == null && !force) { - BlockID responseBlockID = BlockID.getFromProtobuf( - e.getPutBlock().getCommittedBlockLength().getBlockID()); - Preconditions.checkState(blockID.get().getContainerBlockID() - .equals(responseBlockID.getContainerBlockID())); - // updates the bcsId of the block - blockID.set(responseBlockID); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Adding index " + asyncReply.getLogIndex() + " commitMap size " + final CompletableFuture<ContainerCommandResponseProto> flushFuture + = asyncReply.getResponse().thenApplyAsync(e -> { + try { + validateResponse(e); + } catch (IOException sce) { + throw new CompletionException(sce); + } + // if the ioException is not set, putBlock is successful + if (getIoException() == null && !force) { + BlockID responseBlockID = BlockID.getFromProtobuf( + e.getPutBlock().getCommittedBlockLength().getBlockID()); + Preconditions.checkState(blockID.get().getContainerBlockID() + .equals(responseBlockID.getContainerBlockID())); + // updates the bcsId of the block + blockID.set(responseBlockID); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding index " + asyncReply.getLogIndex() + + " commitMap size " + commitWatcher.getCommitInfoMapSize() + " flushLength " + flushPos + " blockID " + blockID); - } - // for standalone protocol, logIndex will always be 0. - commitWatcher - .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList); - } - return e; - }, responseExecutor).exceptionally(e -> { - if (LOG.isDebugEnabled()) { - LOG.debug("putBlock failed for blockID {} with exception {}", - blockID, e.getLocalizedMessage()); - } - CompletionException ce = new CompletionException(e); - setIoException(ce); - throw ce; - }); + } + // for standalone protocol, logIndex will always be 0. + commitWatcher + .updateCommitInfoMap(asyncReply.getLogIndex(), + byteBufferList); + } + return e; + }, responseExecutor).exceptionally(e -> { + if (LOG.isDebugEnabled()) { + LOG.debug("putBlock failed for blockID {} with exception {}", + blockID, e.getLocalizedMessage()); + } + CompletionException ce = new CompletionException(e); + setIoException(ce); + throw ce; + }); + putBlockFuture.updateAndGet(f -> f.thenCombine(flushFuture, + (previous, current) -> current)); } catch (IOException | ExecutionException e) { throw new IOException(EXCEPTION_MSG + e.toString(), e); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); handleInterruptedException(ex, false); } - commitWatcher.getFutureMap().put(flushPos, flushFuture); - return flushFuture; } @Override @@ -484,7 +485,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { // data since latest flush - we need to send the "EOF" flag executePutBlock(true, true); } - waitOnFlushFutures(); + putBlockFuture.get().get(); watchForCommit(false); // just check again if the exception is hit while waiting for the // futures to ensure flush has indeed succeeded @@ -512,15 +513,6 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { } } - private void waitOnFlushFutures() - throws InterruptedException, ExecutionException { - CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( - commitWatcher.getFutureMap().values().toArray( - new CompletableFuture[commitWatcher.getFutureMap().size()])); - // wait for all the transactions to complete - combinedFuture.get(); - } - private void validateResponse( ContainerProtos.ContainerCommandResponseProto responseProto) throws IOException { diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java index 9ae604e951..1820416d32 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java @@ -25,7 +25,6 @@ package org.apache.hadoop.hdds.scm.storage; import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.slf4j.Logger; @@ -35,9 +34,6 @@ import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -59,18 +55,12 @@ public class StreamCommitWatcher { // by all servers private long totalAckDataLength; - // future Map to hold up all putBlock futures - private ConcurrentHashMap<Long, - CompletableFuture<ContainerCommandResponseProto>> - futureMap; - private XceiverClientSpi xceiverClient; public StreamCommitWatcher(XceiverClientSpi xceiverClient, List<StreamBuffer> bufferList) { this.xceiverClient = xceiverClient; commitIndexMap = new ConcurrentSkipListMap<>(); - futureMap = new ConcurrentHashMap<>(); this.bufferList = bufferList; totalAckDataLength = 0; } @@ -180,15 +170,6 @@ public class StreamCommitWatcher { final long length = buffers.stream().mapToLong(StreamBuffer::position).sum(); totalAckDataLength += length; - // clear the future object from the future Map - final CompletableFuture<ContainerCommandResponseProto> remove = - futureMap.remove(totalAckDataLength); - if (remove == null) { - LOG.error("Couldn't find required future for " + totalAckDataLength); - for (Long key : futureMap.keySet()) { - LOG.error("Existing acknowledged data: " + key); - } - } for (StreamBuffer byteBuffer : buffers) { bufferList.remove(byteBuffer); } @@ -209,19 +190,10 @@ public class StreamCommitWatcher { return ioException; } - public ConcurrentMap<Long, - CompletableFuture< - ContainerCommandResponseProto>> getFutureMap() { - return futureMap; - } - public void cleanup() { if (commitIndexMap != null) { commitIndexMap.clear(); } - if (futureMap != null) { - futureMap.clear(); - } commitIndexMap = null; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
