szetszwo commented on code in PR #6859: URL: https://github.com/apache/ozone/pull/6859#discussion_r1676759309
########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java: ########## @@ -320,46 +330,58 @@ public void write(byte[] b, int off, int len) throws IOException { if (len == 0) { return; } - - while (len > 0) { - allocateNewBufferIfNeeded(); - final int writeLen = Math.min(currentBufferRemaining, len); - currentBuffer.put(b, off, writeLen); - currentBufferRemaining -= writeLen; - writeChunkIfNeeded(); - off += writeLen; - len -= writeLen; - updateWrittenDataLength(writeLen); - doFlushOrWatchIfNeeded(); + synchronized (this) { + while (len > 0) { + allocateNewBufferIfNeeded(); + final int writeLen = Math.min(currentBufferRemaining, len); + currentBuffer.put(b, off, writeLen); + currentBufferRemaining -= writeLen; + updateWrittenDataLength(writeLen); + writeChunkIfNeeded(); + off += writeLen; + len -= writeLen; + doFlushOrWatchIfNeeded(); + } } } - public void updateWrittenDataLength(int writeLen) { + protected synchronized void updateWrittenDataLength(int writeLen) { writtenDataLength += writeLen; } private void doFlushOrWatchIfNeeded() throws IOException { if (currentBufferRemaining == 0) { if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) { - updateFlushLength(); - executePutBlock(false, false); + updatePutBlockLength(); + CompletableFuture<PutBlockResult> putBlockFuture = executePutBlock(false, false); + this.lastFlushFuture = watchForCommitAsync(putBlockFuture); } - // Data in the bufferPool can not exceed streamBufferMaxSize - if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) { + + if (bufferPool.isAtCapacity()) { handleFullBuffer(); } } } - private void allocateNewBufferIfNeeded() { + private void allocateNewBufferIfNeeded() throws IOException { if (currentBufferRemaining == 0) { - currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement()); - currentBufferRemaining = currentBuffer.remaining(); + try { + currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement()); + currentBufferRemaining = currentBuffer.remaining(); + LOG.debug("Allocated new buffer {}, used = {}, capacity = {}", currentBuffer, + bufferPool.getNumberOfUsedBuffers(), bufferPool.getCapacity()); + } catch (InterruptedException e) { + handleInterruptedException(e, false); + } } } - private void updateFlushLength() { - totalDataFlushedLength = writtenDataLength; + private void updateWriteChunkLength() { + totalWriteChunkLength = writtenDataLength; Review Comment: Let's add ```java Preconditions.checkState(Thread.holdsLock(this)); ``` ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java: ########## @@ -495,8 +511,8 @@ ContainerCommandResponseProto> executePutBlock(boolean close, byteBufferList = null; } - CompletableFuture<ContainerProtos. - ContainerCommandResponseProto> flushFuture = null; + CompletableFuture<ContainerCommandResponseProto> flushFuture = null; + XceiverClientReply asyncReply; Review Comment: Add final ```java final CompletableFuture<ContainerCommandResponseProto> flushFuture; final XceiverClientReply asyncReply; ``` ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java: ########## @@ -1053,11 +1116,41 @@ private void handleExecutionException(Exception ex) throws IOException { throw getIoException(); } + protected CompletableFuture<Void> getLastFlushFuture() { Review Comment: Need `synchronized`. ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java: ########## @@ -607,40 +621,77 @@ protected void handleFlush(boolean close) throws IOException { private void handleFlushInternal(boolean close) throws IOException, InterruptedException, ExecutionException { checkOpen(); - // flush the last chunk data residing on the currentBuffer - if (totalDataFlushedLength < writtenDataLength) { - refreshCurrentBuffer(); - Preconditions.checkArgument(currentBuffer.position() > 0); - - // This can be a partially filled chunk. Since we are flushing the buffer - // here, we just limit this buffer to the current position. So that next - // write will happen in new buffer - if (currentBuffer.hasRemaining()) { - if (allowPutBlockPiggybacking) { - updateFlushLength(); - writeChunkAndPutBlock(currentBuffer, close); + LOG.debug("Start handleFlushInternal close={}", close); + CompletableFuture<Void> toWaitFor; + synchronized (this) { Review Comment: Let's make it a separated method. ```java private void handleFlushInternal(boolean close) throws IOException, InterruptedException, ExecutionException { checkOpen(); LOG.debug("Start handleFlushInternal close={}", close); CompletableFuture<Void> toWaitFor = handleFlushInternalSynchronized(close); if (toWaitFor != null) { LOG.debug("Waiting for flush"); try { toWaitFor.get(); } catch (ExecutionException ex) { if (ex.getCause() instanceof FlushRuntimeException) { throw ((FlushRuntimeException) ex.getCause()).cause; } else { throw ex; } } } LOG.debug("Flush done."); } private synchronized CompletableFuture<Void> handleFlushInternalSynchronized(boolean close) throws IOException { final CompletableFuture<PutBlockResult> putBlockResultFuture; // flush the last chunk data residing on the currentBuffer if (totalWriteChunkLength < writtenDataLength) { Preconditions.checkArgument(currentBuffer.position() > 0); // This can be a partially filled chunk. Since we are flushing the buffer // here, we just limit this buffer to the current position. So that next // write will happen in new buffer updateWriteChunkLength(); updatePutBlockLength(); if (currentBuffer.hasRemaining()) { if (allowPutBlockPiggybacking) { putBlockResultFuture = writeChunkAndPutBlock(currentBuffer, close); } else { writeChunk(currentBuffer); putBlockResultFuture = executePutBlock(close, false); } if (!close) { // reset current buffer. currentBuffer = null; currentBufferRemaining = 0; } } else { putBlockResultFuture = executePutBlock(close, false); // set lastFuture. } } else if (totalPutBlockLength < totalWriteChunkLength) { // There're no pending written data, but there're uncommitted data. updatePutBlockLength(); putBlockResultFuture = executePutBlock(close, false); } else if (close) { // forcing an "empty" putBlock if stream is being closed without new // data since latest flush - we need to send the "EOF" flag updatePutBlockLength(); putBlockResultFuture = executePutBlock(true, true); } else { LOG.debug("Flushing without data"); return null; } lastFlushFuture = watchForCommitAsync(putBlockResultFuture); return lastFlushFuture; } ``` ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java: ########## @@ -320,46 +330,58 @@ public void write(byte[] b, int off, int len) throws IOException { if (len == 0) { return; } - - while (len > 0) { - allocateNewBufferIfNeeded(); - final int writeLen = Math.min(currentBufferRemaining, len); - currentBuffer.put(b, off, writeLen); - currentBufferRemaining -= writeLen; - writeChunkIfNeeded(); - off += writeLen; - len -= writeLen; - updateWrittenDataLength(writeLen); - doFlushOrWatchIfNeeded(); + synchronized (this) { + while (len > 0) { + allocateNewBufferIfNeeded(); + final int writeLen = Math.min(currentBufferRemaining, len); + currentBuffer.put(b, off, writeLen); + currentBufferRemaining -= writeLen; + updateWrittenDataLength(writeLen); + writeChunkIfNeeded(); + off += writeLen; + len -= writeLen; + doFlushOrWatchIfNeeded(); + } } } - public void updateWrittenDataLength(int writeLen) { + protected synchronized void updateWrittenDataLength(int writeLen) { writtenDataLength += writeLen; } private void doFlushOrWatchIfNeeded() throws IOException { if (currentBufferRemaining == 0) { if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) { - updateFlushLength(); - executePutBlock(false, false); + updatePutBlockLength(); + CompletableFuture<PutBlockResult> putBlockFuture = executePutBlock(false, false); + this.lastFlushFuture = watchForCommitAsync(putBlockFuture); } - // Data in the bufferPool can not exceed streamBufferMaxSize - if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) { + + if (bufferPool.isAtCapacity()) { handleFullBuffer(); } } } - private void allocateNewBufferIfNeeded() { + private void allocateNewBufferIfNeeded() throws IOException { if (currentBufferRemaining == 0) { - currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement()); - currentBufferRemaining = currentBuffer.remaining(); + try { + currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement()); + currentBufferRemaining = currentBuffer.remaining(); + LOG.debug("Allocated new buffer {}, used = {}, capacity = {}", currentBuffer, + bufferPool.getNumberOfUsedBuffers(), bufferPool.getCapacity()); + } catch (InterruptedException e) { + handleInterruptedException(e, false); + } } } - private void updateFlushLength() { - totalDataFlushedLength = writtenDataLength; + private void updateWriteChunkLength() { + totalWriteChunkLength = writtenDataLength; + } + + private void updatePutBlockLength() { + totalPutBlockLength = totalWriteChunkLength; Review Comment: Let's add ```java Preconditions.checkState(Thread.holdsLock(this)); ``` ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java: ########## @@ -607,40 +621,77 @@ protected void handleFlush(boolean close) throws IOException { private void handleFlushInternal(boolean close) throws IOException, InterruptedException, ExecutionException { checkOpen(); - // flush the last chunk data residing on the currentBuffer - if (totalDataFlushedLength < writtenDataLength) { - refreshCurrentBuffer(); - Preconditions.checkArgument(currentBuffer.position() > 0); - - // This can be a partially filled chunk. Since we are flushing the buffer - // here, we just limit this buffer to the current position. So that next - // write will happen in new buffer - if (currentBuffer.hasRemaining()) { - if (allowPutBlockPiggybacking) { - updateFlushLength(); - writeChunkAndPutBlock(currentBuffer, close); + LOG.debug("Start handleFlushInternal close={}", close); + CompletableFuture<Void> toWaitFor; + synchronized (this) { + CompletableFuture<PutBlockResult> putBlockResultFuture = null; + // flush the last chunk data residing on the currentBuffer + if (totalWriteChunkLength < writtenDataLength) { + Preconditions.checkArgument(currentBuffer.position() > 0); + + // This can be a partially filled chunk. Since we are flushing the buffer + // here, we just limit this buffer to the current position. So that next + // write will happen in new buffer + updateWriteChunkLength(); + updatePutBlockLength(); + if (currentBuffer.hasRemaining()) { + if (allowPutBlockPiggybacking) { + putBlockResultFuture = writeChunkAndPutBlock(currentBuffer, close); + } else { + writeChunk(currentBuffer); + putBlockResultFuture = executePutBlock(close, false); + } + if (!close) { + // reset current buffer. + currentBuffer = null; + currentBufferRemaining = 0; + } } else { - writeChunk(currentBuffer); - updateFlushLength(); - executePutBlock(close, false); + putBlockResultFuture = executePutBlock(close, false); + // set lastFuture. } + } else if (totalPutBlockLength < totalWriteChunkLength) { + // There're no pending written data, but there're uncommitted data. + updatePutBlockLength(); + putBlockResultFuture = executePutBlock(close, false); + } else if (close) { + // forcing an "empty" putBlock if stream is being closed without new + // data since latest flush - we need to send the "EOF" flag + updatePutBlockLength(); + putBlockResultFuture = executePutBlock(true, true); } else { - updateFlushLength(); - executePutBlock(close, false); + LOG.debug("Flushing without data"); + } + + if (putBlockResultFuture != null) { + this.lastFlushFuture = watchForCommitAsync(putBlockResultFuture); + } + toWaitFor = this.lastFlushFuture; + } // End of synchronized block. + + if (toWaitFor != null) { + LOG.debug("Waiting for flush"); + try { + toWaitFor.get(); + } catch (ExecutionException ex) { + if (ex.getCause() instanceof FlushRuntimeException) { + throw ((FlushRuntimeException) ex.getCause()).cause; + } else { + throw ex; + } } - } else if (close) { - // forcing an "empty" putBlock if stream is being closed without new - // data since latest flush - we need to send the "EOF" flag - executePutBlock(true, true); } - waitOnFlushFutures(); - watchForCommit(false); - // just check again if the exception is hit while waiting for the - // futures to ensure flush has indeed succeeded + LOG.debug("Flush done."); + } - // irrespective of whether the commitIndex2flushedDataMap is empty - // or not, ensure there is no exception set - checkOpen(); + private CompletableFuture<Void> watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) { Review Comment: `watchForCommit` is synced. The code below requires a thread to wait for the `replyFuture.get()` in `XceiverClientRatis.watchForCommit(index)`. We should check which thread it will use (print the thread name) and if it is okay for that thread to wait. A better way to to return the `replyFuture` in the first place but it will need more refactoring. We may do it separately. ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java: ########## @@ -20,30 +20,45 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.function.Supplier; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.ozone.common.ChunkBuffer; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.Preconditions; - -import static java.util.Collections.emptyList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This class creates and manages pool of n buffers. + * A bounded pool implementation that provides {@link ChunkBuffer}s. This pool allows allocating and releasing + * {@link ChunkBuffer}. + * This pool is designed for concurrent access to allocation and release. It imposes a maximum number of buffers to be + * allocated at the same time and once the limit has been approached, the thread requesting a new allocation needs to + * wait until a allocated buffer is released. */ public class BufferPool { + public static final Logger LOG = LoggerFactory.getLogger(BufferPool.class); private static final BufferPool EMPTY = new BufferPool(0, 0); - - private final List<ChunkBuffer> bufferList; - private int currentBufferIndex; private final int bufferSize; private final int capacity; private final Function<ByteBuffer, ByteString> byteStringConversion; + private final LinkedList<ChunkBuffer> allocated = new LinkedList<>(); Review Comment: Use `IdentityHashMap`. ########## hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java: ########## @@ -537,26 +553,26 @@ ContainerCommandResponseProto> executePutBlock(boolean close, } catch (InterruptedException ex) { Thread.currentThread().interrupt(); handleInterruptedException(ex, false); + // never reach, just to make compiler happy. + return null; Review Comment: A better approach is to make `handleInterruptedException()` return the exception and then `throw` it here. Let's do it separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org