This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new d8ea69dae7 HDDS-9844. [hsync] De-synchronize write APIs (#6859)
d8ea69dae7 is described below
commit d8ea69dae7a853a03116a47c5f7e67db307f5308
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Jul 15 14:56:45 2024 -0700
HDDS-9844. [hsync] De-synchronize write APIs (#6859)
---
.../hdds/scm/storage/AbstractCommitWatcher.java | 4 +-
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 308 ++++++++++++++-------
.../apache/hadoop/hdds/scm/storage/BufferPool.java | 186 +++++++++----
.../hadoop/hdds/scm/storage/CommitWatcher.java | 44 +--
.../hdds/scm/storage/ECBlockOutputStream.java | 18 +-
.../hdds/scm/storage/RatisBlockOutputStream.java | 23 +-
.../hadoop/hdds/scm/storage/TestBufferPool.java | 120 ++++++--
.../common/ChunkBufferImplWithByteBuffer.java | 4 +-
.../client/io/BlockOutputStreamEntryPool.java | 11 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 29 +-
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 142 +++++++---
.../hadoop/hdds/scm/storage/TestCommitWatcher.java | 13 -
.../ozone/client/rpc/TestBlockOutputStream.java | 20 +-
13 files changed, 588 insertions(+), 334 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index 957f761ccb..0c6b929344 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -79,7 +79,7 @@ abstract class AbstractCommitWatcher<BUFFER> {
}
/** @return the total data which has been acknowledged. */
- long getTotalAckDataLength() {
+ synchronized long getTotalAckDataLength() {
return totalAckDataLength;
}
@@ -166,7 +166,7 @@ abstract class AbstractCommitWatcher<BUFFER> {
/** Release the buffers for the given index. */
abstract void releaseBuffers(long index);
- void adjustBuffers(long commitIndex) {
+ synchronized void adjustBuffers(long commitIndex) {
commitIndexMap.keySet().stream()
.filter(p -> p <= commitIndex)
.forEach(this::releaseBuffers);
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 2d40439c33..0e8a0d56fe 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -112,8 +112,11 @@ public class BlockOutputStream extends OutputStream {
private final AtomicReference<IOException> ioException;
private final ExecutorService responseExecutor;
- // the effective length of data flushed so far
- private long totalDataFlushedLength;
+ // the effective length of data sent to datanodes (via writeChunk).
+ private long totalWriteChunkLength;
+
+ // The effective length of data flushed to datanodes (via putBlock).
+ private long totalPutBlockLength;
// effective data write attempted so far for the block
private long writtenDataLength;
@@ -146,6 +149,8 @@ public class BlockOutputStream extends OutputStream {
private final ContainerClientMetrics clientMetrics;
private boolean allowPutBlockPiggybacking;
+ private CompletableFuture<Void> lastFlushFuture;
+
/**
* Creates a new BlockOutputStream.
*
@@ -208,7 +213,8 @@ public class BlockOutputStream extends OutputStream {
this.responseExecutor = blockOutputStreamResourceProvider.get();
bufferList = null;
- totalDataFlushedLength = 0;
+ totalWriteChunkLength = 0;
+ totalPutBlockLength = 0;
writtenDataLength = 0;
failedServers = new ArrayList<>(0);
ioException = new AtomicReference<>(null);
@@ -233,7 +239,7 @@ public class BlockOutputStream extends OutputStream {
return true;
}
- void refreshCurrentBuffer() {
+ synchronized void refreshCurrentBuffer() {
currentBuffer = bufferPool.getCurrentBuffer();
currentBufferRemaining =
currentBuffer != null ? currentBuffer.remaining() : 0;
@@ -247,7 +253,7 @@ public class BlockOutputStream extends OutputStream {
return 0;
}
- public long getWrittenDataLength() {
+ public synchronized long getWrittenDataLength() {
return writtenDataLength;
}
@@ -262,7 +268,7 @@ public class BlockOutputStream extends OutputStream {
@VisibleForTesting
public long getTotalDataFlushedLength() {
- return totalDataFlushedLength;
+ return totalPutBlockLength;
}
@VisibleForTesting
@@ -293,17 +299,21 @@ public class BlockOutputStream extends OutputStream {
@Override
public void write(int b) throws IOException {
checkOpen();
- allocateNewBufferIfNeeded();
- currentBuffer.put((byte) b);
- currentBufferRemaining--;
- writeChunkIfNeeded();
- writtenDataLength++;
- doFlushOrWatchIfNeeded();
+ synchronized (this) {
+ allocateNewBufferIfNeeded();
+ currentBuffer.put((byte) b);
+ currentBufferRemaining--;
+ updateWrittenDataLength(1);
+ writeChunkIfNeeded();
+ doFlushOrWatchIfNeeded();
+ }
}
private void writeChunkIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
+ LOG.debug("WriteChunk from write(), buffer = {}", currentBuffer);
writeChunk(currentBuffer);
+ updateWriteChunkLength();
}
}
@@ -315,51 +325,66 @@ public class BlockOutputStream extends OutputStream {
}
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
|| ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
+ throw new IndexOutOfBoundsException("Offset=" + off + " and len="
+ + len + " don't match the array length of " + b.length);
}
if (len == 0) {
return;
}
-
- while (len > 0) {
- allocateNewBufferIfNeeded();
- final int writeLen = Math.min(currentBufferRemaining, len);
- currentBuffer.put(b, off, writeLen);
- currentBufferRemaining -= writeLen;
- writeChunkIfNeeded();
- off += writeLen;
- len -= writeLen;
- updateWrittenDataLength(writeLen);
- doFlushOrWatchIfNeeded();
+ synchronized (this) {
+ while (len > 0) {
+ allocateNewBufferIfNeeded();
+ final int writeLen = Math.min(currentBufferRemaining, len);
+ currentBuffer.put(b, off, writeLen);
+ currentBufferRemaining -= writeLen;
+ updateWrittenDataLength(writeLen);
+ writeChunkIfNeeded();
+ off += writeLen;
+ len -= writeLen;
+ doFlushOrWatchIfNeeded();
+ }
}
}
- public void updateWrittenDataLength(int writeLen) {
+ protected synchronized void updateWrittenDataLength(int writeLen) {
writtenDataLength += writeLen;
}
private void doFlushOrWatchIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
- updateFlushLength();
- executePutBlock(false, false);
+ updatePutBlockLength();
+ CompletableFuture<PutBlockResult> putBlockFuture =
executePutBlock(false, false);
+ this.lastFlushFuture = watchForCommitAsync(putBlockFuture);
}
- // Data in the bufferPool can not exceed streamBufferMaxSize
- if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
+
+ if (bufferPool.isAtCapacity()) {
handleFullBuffer();
}
}
}
- private void allocateNewBufferIfNeeded() {
+ private void allocateNewBufferIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
- currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
- currentBufferRemaining = currentBuffer.remaining();
+ try {
+ currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
+ currentBufferRemaining = currentBuffer.remaining();
+ LOG.debug("Allocated new buffer {}, used = {}, capacity = {}",
currentBuffer,
+ bufferPool.getNumberOfUsedBuffers(), bufferPool.getCapacity());
+ } catch (InterruptedException e) {
+ handleInterruptedException(e, false);
+ }
}
}
- private void updateFlushLength() {
- totalDataFlushedLength = writtenDataLength;
+ private void updateWriteChunkLength() {
+ Preconditions.checkState(Thread.holdsLock(this));
+ totalWriteChunkLength = writtenDataLength;
+ }
+
+ private void updatePutBlockLength() {
+ Preconditions.checkState(Thread.holdsLock(this));
+ totalPutBlockLength = totalWriteChunkLength;
}
/**
@@ -370,7 +395,7 @@ public class BlockOutputStream extends OutputStream {
*/
// In this case, the data is already cached in the currentBuffer.
- public void writeOnRetry(long len) throws IOException {
+ public synchronized void writeOnRetry(long len) throws IOException {
if (len == 0) {
return;
}
@@ -379,8 +404,9 @@ public class BlockOutputStream extends OutputStream {
}
Preconditions.checkArgument(len <=
streamBufferArgs.getStreamBufferMaxSize());
int count = 0;
+ List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
while (len > 0) {
- ChunkBuffer buffer = bufferPool.getBuffer(count);
+ ChunkBuffer buffer = allocatedBuffers.get(count);
long writeLen = Math.min(buffer.position(), len);
if (!buffer.hasRemaining()) {
writeChunk(buffer);
@@ -396,8 +422,10 @@ public class BlockOutputStream extends OutputStream {
if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() ==
0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
- updateFlushLength();
- executePutBlock(false, false);
+ updateWriteChunkLength();
+ updatePutBlockLength();
+ CompletableFuture<PutBlockResult> putBlockResultFuture =
executePutBlock(false, false);
+ lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
}
if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
handleFullBuffer();
@@ -412,20 +440,15 @@ public class BlockOutputStream extends OutputStream {
* @throws IOException
*/
private void handleFullBuffer() throws IOException {
- waitForFlushAndCommit(true);
- }
-
- void waitForFlushAndCommit(boolean bufferFull) throws IOException {
try {
checkOpen();
- waitOnFlushFutures();
+ waitOnFlushFuture();
} catch (ExecutionException e) {
handleExecutionException(e);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, true);
}
- watchForCommit(bufferFull);
}
void releaseBuffersOnException() {
@@ -439,22 +462,19 @@ public class BlockOutputStream extends OutputStream {
refreshCurrentBuffer();
}
- XceiverClientReply sendWatchForCommit(boolean bufferFull)
+ /**
+ * Watch for a specific commit index.
+ */
+ XceiverClientReply sendWatchForCommit(long commitIndex)
throws IOException {
return null;
}
- /**
- * calls watchForCommit API of the Ratis Client. For Standalone client,
- * it is a no op.
- * @param bufferFull flag indicating whether bufferFull condition is hit or
- * its called as part flush/close
- * @throws IOException IOException in case watch gets timed out
- */
- private void watchForCommit(boolean bufferFull) throws IOException {
+ private void watchForCommit(long commitIndex) throws IOException {
checkOpen();
try {
- final XceiverClientReply reply = sendWatchForCommit(bufferFull);
+ LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
+ final XceiverClientReply reply = sendWatchForCommit(commitIndex);
if (reply != null) {
List<DatanodeDetails> dnList = reply.getDatanodes();
if (!dnList.isEmpty()) {
@@ -469,7 +489,7 @@ public class BlockOutputStream extends OutputStream {
setIoException(ioe);
throw getIoException();
}
- refreshCurrentBuffer();
+ LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex);
}
void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
@@ -480,11 +500,10 @@ public class BlockOutputStream extends OutputStream {
* @param force true if no data was written since most recent putBlock and
* stream is being closed
*/
- CompletableFuture<ContainerProtos.
- ContainerCommandResponseProto> executePutBlock(boolean close,
+ CompletableFuture<PutBlockResult> executePutBlock(boolean close,
boolean force) throws IOException {
checkOpen();
- long flushPos = totalDataFlushedLength;
+ long flushPos = totalWriteChunkLength;
final List<ChunkBuffer> byteBufferList;
if (!force) {
Preconditions.checkNotNull(bufferList);
@@ -495,8 +514,8 @@ public class BlockOutputStream extends OutputStream {
byteBufferList = null;
}
- CompletableFuture<ContainerProtos.
- ContainerCommandResponseProto> flushFuture = null;
+ final CompletableFuture<ContainerCommandResponseProto> flushFuture;
+ final XceiverClientReply asyncReply;
try {
BlockData blockData = containerBlockData.build();
LOG.debug("sending PutBlock {}", blockData);
@@ -507,10 +526,8 @@ public class BlockOutputStream extends OutputStream {
containerBlockData.clearChunks();
}
- XceiverClientReply asyncReply =
- putBlockAsync(xceiverClient, blockData, close, tokenString);
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
- asyncReply.getResponse();
+ asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString);
+ CompletableFuture<ContainerCommandResponseProto> future =
asyncReply.getResponse();
flushFuture = future.thenApplyAsync(e -> {
try {
validateResponse(e);
@@ -537,13 +554,10 @@ public class BlockOutputStream extends OutputStream {
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
+ // never reach, just to make compiler happy.
+ return null;
}
- putFlushFuture(flushPos, flushFuture);
- return flushFuture;
- }
-
- void putFlushFuture(long flushPos,
- CompletableFuture<ContainerCommandResponseProto> flushFuture) {
+ return flushFuture.thenApply(r -> new PutBlockResult(flushPos,
asyncReply.getLogIndex(), r));
}
@Override
@@ -551,12 +565,15 @@ public class BlockOutputStream extends OutputStream {
if (xceiverClientFactory != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0
&& (!streamBufferArgs.isStreamBufferFlushDelay() ||
- writtenDataLength - totalDataFlushedLength
- >= streamBufferArgs.getStreamBufferSize())) {
+ unflushedLength() >= streamBufferArgs.getStreamBufferSize())) {
handleFlush(false);
}
}
+ private synchronized long unflushedLength() {
+ return writtenDataLength - totalPutBlockLength;
+ }
+
private void writeChunkCommon(ChunkBuffer buffer)
throws IOException {
// This data in the buffer will be pushed to datanode and a reference will
@@ -570,16 +587,16 @@ public class BlockOutputStream extends OutputStream {
bufferList.add(buffer);
}
- private void writeChunk(ChunkBuffer buffer)
- throws IOException {
+ private void writeChunk(ChunkBuffer buffer) throws IOException {
writeChunkCommon(buffer);
writeChunkToContainer(buffer.duplicate(0, buffer.position()), false,
false);
}
- private void writeChunkAndPutBlock(ChunkBuffer buffer, boolean close)
+ private CompletableFuture<PutBlockResult> writeChunkAndPutBlock(ChunkBuffer
buffer, boolean close)
throws IOException {
+ LOG.debug("WriteChunk and Putblock from flush, buffer={}", buffer);
writeChunkCommon(buffer);
- writeChunkToContainer(buffer.duplicate(0, buffer.position()), true, close);
+ return writeChunkToContainer(buffer.duplicate(0, buffer.position()), true,
close);
}
/**
@@ -607,40 +624,77 @@ public class BlockOutputStream extends OutputStream {
private void handleFlushInternal(boolean close)
throws IOException, InterruptedException, ExecutionException {
checkOpen();
+ LOG.debug("Start handleFlushInternal close={}", close);
+ CompletableFuture<Void> toWaitFor = handleFlushInternalSynchronized(close);
+
+ if (toWaitFor != null) {
+ LOG.debug("Waiting for flush");
+ try {
+ toWaitFor.get();
+ } catch (ExecutionException ex) {
+ if (ex.getCause() instanceof FlushRuntimeException) {
+ throw ((FlushRuntimeException) ex.getCause()).cause;
+ } else {
+ throw ex;
+ }
+ }
+ LOG.debug("Flush done.");
+ }
+ }
+
+ private synchronized CompletableFuture<Void>
handleFlushInternalSynchronized(boolean close) throws IOException {
+ CompletableFuture<PutBlockResult> putBlockResultFuture = null;
// flush the last chunk data residing on the currentBuffer
- if (totalDataFlushedLength < writtenDataLength) {
- refreshCurrentBuffer();
+ if (totalWriteChunkLength < writtenDataLength) {
Preconditions.checkArgument(currentBuffer.position() > 0);
// This can be a partially filled chunk. Since we are flushing the buffer
// here, we just limit this buffer to the current position. So that next
// write will happen in new buffer
+ updateWriteChunkLength();
+ updatePutBlockLength();
if (currentBuffer.hasRemaining()) {
if (allowPutBlockPiggybacking) {
- updateFlushLength();
- writeChunkAndPutBlock(currentBuffer, close);
+ putBlockResultFuture = writeChunkAndPutBlock(currentBuffer, close);
} else {
writeChunk(currentBuffer);
- updateFlushLength();
- executePutBlock(close, false);
+ putBlockResultFuture = executePutBlock(close, false);
+ }
+ if (!close) {
+ // reset current buffer so that the next write will allocate a new
one.
+ currentBuffer = null;
+ currentBufferRemaining = 0;
}
} else {
- updateFlushLength();
- executePutBlock(close, false);
+ putBlockResultFuture = executePutBlock(close, false);
+ // set lastFuture.
}
+ } else if (totalPutBlockLength < totalWriteChunkLength) {
+ // There're no pending written data, but there're uncommitted data.
+ updatePutBlockLength();
+ putBlockResultFuture = executePutBlock(close, false);
} else if (close) {
// forcing an "empty" putBlock if stream is being closed without new
// data since latest flush - we need to send the "EOF" flag
- executePutBlock(true, true);
+ updatePutBlockLength();
+ putBlockResultFuture = executePutBlock(true, true);
+ } else {
+ LOG.debug("Flushing without data");
+ }
+ if (putBlockResultFuture != null) {
+ this.lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
}
- waitOnFlushFutures();
- watchForCommit(false);
- // just check again if the exception is hit while waiting for the
- // futures to ensure flush has indeed succeeded
+ return lastFlushFuture;
+ }
- // irrespective of whether the commitIndex2flushedDataMap is empty
- // or not, ensure there is no exception set
- checkOpen();
+ private CompletableFuture<Void>
watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
+ return putBlockResultFuture.thenAccept(x -> {
+ try {
+ watchForCommit(x.commitIndex);
+ } catch (IOException e) {
+ throw new FlushRuntimeException(e);
+ }
+ });
}
@Override
@@ -658,11 +712,11 @@ public class BlockOutputStream extends OutputStream {
}
}
- void waitOnFlushFutures() throws InterruptedException, ExecutionException {
+ void waitOnFlushFuture() throws InterruptedException, ExecutionException {
}
void validateResponse(
- ContainerProtos.ContainerCommandResponseProto responseProto)
+ ContainerCommandResponseProto responseProto)
throws IOException {
try {
// if the ioException is already set, it means a prev request has failed
@@ -742,7 +796,20 @@ public class BlockOutputStream extends OutputStream {
* checksum
* @return
*/
- CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
+ CompletableFuture<ContainerCommandResponseProto>
writeChunkToContainer(ChunkBuffer chunk) throws IOException {
+ return writeChunkToContainer(chunk, false, false).thenApply(x ->
x.response);
+ }
+
+ /**
+ * Writes buffered data as a new chunk to the container and saves chunk
+ * information to be used later in putKey call.
+ *
+ * @throws IOException if there is an I/O error while performing the call
+ * @throws OzoneChecksumException if there is an error while computing
+ * checksum
+ * @return
+ */
+ private CompletableFuture<PutBlockResult> writeChunkToContainer(
ChunkBuffer chunk, boolean putBlockPiggybacking, boolean close) throws
IOException {
int effectiveChunkSize = chunk.remaining();
final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
@@ -756,7 +823,7 @@ public class BlockOutputStream extends OutputStream {
.setChecksumData(checksumData.getProtoBufMessage())
.build();
- long flushPos = totalDataFlushedLength;
+ long flushPos = totalWriteChunkLength;
if (LOG.isDebugEnabled()) {
LOG.debug("Writing chunk {} length {} at offset {}",
@@ -776,8 +843,9 @@ public class BlockOutputStream extends OutputStream {
}
final List<ChunkBuffer> byteBufferList;
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ CompletableFuture<ContainerCommandResponseProto>
validateFuture = null;
+ XceiverClientReply asyncReply;
try {
BlockData blockData = null;
@@ -804,9 +872,9 @@ public class BlockOutputStream extends OutputStream {
byteBufferList = null;
}
- XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
+ asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
blockID.get(), data, tokenString, replicationIndex, blockData,
close);
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ CompletableFuture<ContainerCommandResponseProto>
respFuture = asyncReply.getResponse();
validateFuture = respFuture.thenApplyAsync(e -> {
try {
@@ -835,11 +903,10 @@ public class BlockOutputStream extends OutputStream {
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
+ // never reach.
+ return null;
}
- if (putBlockPiggybacking) {
- putFlushFuture(flushPos, validateFuture);
- }
- return validateFuture;
+ return validateFuture.thenApply(x -> new PutBlockResult(flushPos,
asyncReply.getLogIndex(), x));
}
private void handleSuccessfulPutBlock(
@@ -856,9 +923,8 @@ public class BlockOutputStream extends OutputStream {
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
- + " blockID " + blockID + " bufferPool size" + bufferPool
- .getSize() + " currentBufferIndex " + bufferPool
- .getCurrentBufferIndex());
+ + " blockID " + blockID + " bufferPool size " + bufferPool
+ .getSize());
}
// for standalone protocol, logIndex will always be 0.
updateCommitInfo(asyncReply, byteBufferList);
@@ -1053,6 +1119,10 @@ public class BlockOutputStream extends OutputStream {
throw getIoException();
}
+ protected synchronized CompletableFuture<Void> getLastFlushFuture() {
+ return lastFlushFuture;
+ }
+
/**
* Get the Replication Index.
* @return replicationIndex
@@ -1060,4 +1130,30 @@ public class BlockOutputStream extends OutputStream {
public int getReplicationIndex() {
return replicationIndex;
}
+
+ static class PutBlockResult {
+ private final long flushPosition;
+ private final long commitIndex;
+ private final ContainerCommandResponseProto response;
+
+ PutBlockResult(long flushPosition, long commitIndex,
ContainerCommandResponseProto response) {
+ this.flushPosition = flushPosition;
+ this.commitIndex = commitIndex;
+ this.response = response;
+ }
+
+ ContainerCommandResponseProto getResponse() {
+ return response;
+ }
+ }
+
+ /**
+ * RuntimeException to wrap watchForCommit errors when running
asynchronously.
+ */
+ private static class FlushRuntimeException extends RuntimeException {
+ private final IOException cause;
+ FlushRuntimeException(IOException cause) {
+ this.cause = cause;
+ }
+ }
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
index b68b56f67c..abacf318c4 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -20,30 +20,45 @@ package org.apache.hadoop.hdds.scm.storage;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
+import java.util.function.Supplier;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
-
-import static java.util.Collections.emptyList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This class creates and manages pool of n buffers.
+ * A bounded pool implementation that provides {@link ChunkBuffer}s. This pool
allows allocating and releasing
+ * {@link ChunkBuffer}.
+ * This pool is designed for concurrent access to allocation and release. It
imposes a maximum number of buffers to be
+ * allocated at the same time and once the limit has been approached, the
thread requesting a new allocation needs to
+ * wait until a allocated buffer is released.
*/
public class BufferPool {
+ public static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
private static final BufferPool EMPTY = new BufferPool(0, 0);
-
- private final List<ChunkBuffer> bufferList;
- private int currentBufferIndex;
private final int bufferSize;
private final int capacity;
private final Function<ByteBuffer, ByteString> byteStringConversion;
+ private final LinkedList<ChunkBuffer> allocated = new LinkedList<>();
+ private final LinkedList<ChunkBuffer> released = new LinkedList<>();
+ private ChunkBuffer currentBuffer = null;
+ private final Lock lock = new ReentrantLock();
+ private final Condition notFull = lock.newCondition();
+
+
public static BufferPool empty() {
return EMPTY;
}
@@ -57,8 +72,6 @@ public class BufferPool {
Function<ByteBuffer, ByteString> byteStringConversion) {
this.capacity = capacity;
this.bufferSize = bufferSize;
- bufferList = capacity == 0 ? emptyList() : new ArrayList<>(capacity);
- currentBufferIndex = -1;
this.byteStringConversion = byteStringConversion;
}
@@ -67,81 +80,135 @@ public class BufferPool {
}
ChunkBuffer getCurrentBuffer() {
- return currentBufferIndex == -1 ? null :
bufferList.get(currentBufferIndex);
+ return doInLock(() -> currentBuffer);
}
/**
- * If the currentBufferIndex is less than the buffer size - 1,
- * it means, the next buffer in the list has been freed up for
- * rewriting. Reuse the next available buffer in such cases.
- * <p>
- * In case, the currentBufferIndex == buffer.size and buffer size is still
- * less than the capacity to be allocated, just allocate a buffer of size
- * chunk size.
+ * Allocate a new {@link ChunkBuffer}, waiting for a buffer to be released
when this pool already allocates at
+ * capacity.
*/
- public ChunkBuffer allocateBuffer(int increment) {
- final int nextBufferIndex = currentBufferIndex + 1;
-
- Preconditions.assertTrue(nextBufferIndex < capacity, () ->
- "next index: " + nextBufferIndex + " >= capacity: " + capacity);
-
- currentBufferIndex = nextBufferIndex;
-
- if (currentBufferIndex < bufferList.size()) {
- return getBuffer(currentBufferIndex);
- } else {
- final ChunkBuffer newBuffer = ChunkBuffer.allocate(bufferSize,
increment);
- bufferList.add(newBuffer);
- return newBuffer;
+ public ChunkBuffer allocateBuffer(int increment) throws InterruptedException
{
+ lock.lockInterruptibly();
+ try {
+ Preconditions.assertTrue(allocated.size() + released.size() <= capacity,
() ->
+ "Total created buffer must not exceed capacity.");
+
+ while (allocated.size() == capacity) {
+ LOG.debug("Allocation needs to wait the pool is at capacity (allocated
= capacity = {}).", capacity);
+ notFull.await();
+ }
+ // Get a buffer to allocate, preferably from the released ones.
+ final ChunkBuffer buffer = released.isEmpty() ?
+ ChunkBuffer.allocate(bufferSize, increment) : released.removeFirst();
+ allocated.add(buffer);
+ currentBuffer = buffer;
+
+ LOG.debug("Allocated new buffer {}, number of used buffers {}, capacity
{}.",
+ buffer, allocated.size(), capacity);
+ return buffer;
+ } finally {
+ lock.unlock();
}
}
- void releaseBuffer(ChunkBuffer chunkBuffer) {
- Preconditions.assertTrue(!bufferList.isEmpty(), "empty buffer list");
- Preconditions.assertSame(bufferList.get(0), chunkBuffer,
- "only the first buffer can be released");
- Preconditions.assertTrue(currentBufferIndex >= 0,
- () -> "current buffer: " + currentBufferIndex);
+ void releaseBuffer(ChunkBuffer buffer) {
+ LOG.debug("Releasing buffer {}", buffer);
+ lock.lock();
+ try {
+ Preconditions.assertTrue(removeByIdentity(allocated, buffer), "Releasing
unknown buffer");
+ buffer.clear();
+ released.add(buffer);
+ if (buffer == currentBuffer) {
+ currentBuffer = null;
+ }
+ notFull.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
- // always remove from head of the list and append at last
- final ChunkBuffer buffer = bufferList.remove(0);
- buffer.clear();
- bufferList.add(buffer);
- currentBufferIndex--;
+ /**
+ * Remove an item from a list by identity.
+ * @return true if the item is found and removed from the list, otherwise
false.
+ */
+ private static <T> boolean removeByIdentity(List<T> list, T toRemove) {
+ int i = 0;
+ for (T item : list) {
+ if (item == toRemove) {
+ break;
+ } else {
+ i++;
+ }
+ }
+ if (i < list.size()) {
+ list.remove(i);
+ return true;
+ }
+ return false;
}
- public void clearBufferPool() {
- bufferList.forEach(ChunkBuffer::close);
- bufferList.clear();
- currentBufferIndex = -1;
+ /**
+ * Wait until one buffer is available.
+ * @throws InterruptedException
+ */
+ @VisibleForTesting
+ public void waitUntilAvailable() throws InterruptedException {
+ lock.lockInterruptibly();
+ try {
+ while (allocated.size() == capacity) {
+ notFull.await();
+ }
+ } finally {
+ lock.unlock();
+ }
}
- public void checkBufferPoolEmpty() {
- Preconditions.assertSame(0, computeBufferData(), "total buffer size");
+ public void clearBufferPool() {
+ lock.lock();
+ try {
+ allocated.forEach(ChunkBuffer::close);
+ released.forEach(ChunkBuffer::close);
+ allocated.clear();
+ released.clear();
+ currentBuffer = null;
+ } finally {
+ lock.unlock();
+ }
}
public long computeBufferData() {
- long totalBufferSize = 0;
- for (ChunkBuffer buf : bufferList) {
- totalBufferSize += buf.position();
- }
- return totalBufferSize;
+ return doInLock(() -> {
+ long totalBufferSize = 0;
+ for (ChunkBuffer buf : allocated) {
+ totalBufferSize += buf.position();
+ }
+ return totalBufferSize;
+ });
}
public int getSize() {
- return bufferList.size();
+ return doInLock(() -> allocated.size() + released.size());
}
- public ChunkBuffer getBuffer(int index) {
- return bufferList.get(index);
+ public List<ChunkBuffer> getAllocatedBuffers() {
+ return doInLock(() -> new ArrayList<>(allocated));
}
- int getCurrentBufferIndex() {
- return currentBufferIndex;
+ public int getNumberOfUsedBuffers() {
+ return doInLock(allocated::size);
}
- public int getNumberOfUsedBuffers() {
- return currentBufferIndex + 1;
+ private <T> T doInLock(Supplier<T> supplier) {
+ lock.lock();
+ try {
+ return supplier.get();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isAtCapacity() {
+ return getNumberOfUsedBuffers() == capacity;
}
public int getCapacity() {
@@ -151,4 +218,5 @@ public class BufferPool {
public int getBufferSize() {
return bufferSize;
}
+
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index aa339409ec..7c3bb40ef3 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -24,17 +24,9 @@
*/
package org.apache.hadoop.hdds.scm.storage;
-import com.google.common.annotations.VisibleForTesting;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.ozone.common.ChunkBuffer;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-
/**
* This class executes watchForCommit on ratis pipeline and releases
* buffers once data successfully gets replicated.
@@ -43,10 +35,6 @@ class CommitWatcher extends
AbstractCommitWatcher<ChunkBuffer> {
// A reference to the pool of buffers holding the data
private final BufferPool bufferPool;
- // future Map to hold up all putBlock futures
- private final ConcurrentMap<Long,
CompletableFuture<ContainerCommandResponseProto>>
- futureMap = new ConcurrentHashMap<>();
-
CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
super(xceiverClient);
this.bufferPool = bufferPool;
@@ -59,37 +47,17 @@ class CommitWatcher extends
AbstractCommitWatcher<ChunkBuffer> {
acked += buffer.position();
bufferPool.releaseBuffer(buffer);
}
- final long totalLength = addAckDataLength(acked);
- // When putBlock is called, a future is added.
- // When putBlock is replied, the future is removed below.
- // Therefore, the removed future should not be null.
- final CompletableFuture<ContainerCommandResponseProto> removed =
- futureMap.remove(totalLength);
- Objects.requireNonNull(removed, () -> "Future not found for "
- + totalLength + ": existing = " + futureMap.keySet());
- }
-
- @VisibleForTesting
- ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>>
getFutureMap() {
- return futureMap;
- }
-
- public void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
- futureMap.compute(flushPos,
- (key, previous) -> previous == null ? flushFuture :
- previous.thenCombine(flushFuture, (prev, curr) -> curr));
- }
-
+ // TODO move the flush future map to BOS:
+ // When there are concurrent watchForCommits, there's no guarantee of the
order of execution
+ // and the following logic to address the flushed length become
irrelevant.
+ // The flush future should be handled by BlockOutputStream and use the
flushIndex which is a result of
+ // executePutBlock.
- public void waitOnFlushFutures() throws InterruptedException,
ExecutionException {
- // wait for all the transactions to complete
- CompletableFuture.allOf(futureMap.values().toArray(
- new CompletableFuture[0])).get();
+ addAckDataLength(acked);
}
@Override
public void cleanup() {
super.cleanup();
- futureMap.clear();
}
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 843727cef2..bbb3f30687 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -87,16 +87,16 @@ public class ECBlockOutputStream extends BlockOutputStream {
}
@Override
- public void write(byte[] b, int off, int len) throws IOException {
+ public synchronized void write(byte[] b, int off, int len) throws
IOException {
this.currentChunkRspFuture =
writeChunkToContainer(
- ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false, false);
+ ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
updateWrittenDataLength(len);
}
public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
write(
ByteBuffer buff) throws IOException {
- return writeChunkToContainer(ChunkBuffer.wrap(buff), false, false);
+ return writeChunkToContainer(ChunkBuffer.wrap(buff));
}
public CompletableFuture<ContainerProtos.
@@ -199,7 +199,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
ContainerCommandResponseProto> executePutBlock(boolean close,
boolean force, long blockGroupLength) throws IOException {
updateBlockGroupLengthInPutBlockMeta(blockGroupLength);
- return executePutBlock(close, force);
+ return executePutBlock(close,
force).thenApply(PutBlockResult::getResponse);
}
private void updateBlockGroupLengthInPutBlockMeta(final long blockGroupLen) {
@@ -232,13 +232,13 @@ public class ECBlockOutputStream extends
BlockOutputStream {
* @param force true if no data was written since most recent putBlock and
* stream is being closed
*/
- public CompletableFuture<ContainerProtos.
- ContainerCommandResponseProto> executePutBlock(boolean close,
+ @Override
+ public CompletableFuture<PutBlockResult> executePutBlock(boolean close,
boolean force) throws IOException {
checkOpen();
CompletableFuture<ContainerProtos.
- ContainerCommandResponseProto> flushFuture = null;
+ ContainerCommandResponseProto> flushFuture;
try {
ContainerProtos.BlockData blockData = getContainerBlockData().build();
XceiverClientReply asyncReply =
@@ -273,9 +273,11 @@ public class ECBlockOutputStream extends BlockOutputStream
{
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
handleInterruptedException(ex, false);
+ // never reach.
+ return null;
}
this.putBlkRspFuture = flushFuture;
- return flushFuture;
+ return flushFuture.thenApply(r -> new PutBlockResult(0, 0, r));
}
/**
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index b587b1d131..c0e99a5b4a 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.storage;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hdds.client.BlockID;
-import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
@@ -102,9 +101,8 @@ public class RatisBlockOutputStream extends
BlockOutputStream
}
@Override
- XceiverClientReply sendWatchForCommit(boolean bufferFull) throws IOException
{
- return bufferFull ? commitWatcher.watchOnFirstIndex()
- : commitWatcher.watchOnLastIndex();
+ XceiverClientReply sendWatchForCommit(long commitIndex) throws IOException {
+ return commitWatcher.watchForCommit(commitIndex);
}
@Override
@@ -113,13 +111,11 @@ public class RatisBlockOutputStream extends
BlockOutputStream
}
@Override
- void putFlushFuture(long flushPos,
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
- commitWatcher.putFlushFuture(flushPos, flushFuture);
- }
-
- @Override
- void waitOnFlushFutures() throws InterruptedException, ExecutionException {
- commitWatcher.waitOnFlushFutures();
+ void waitOnFlushFuture() throws InterruptedException, ExecutionException {
+ CompletableFuture<Void> flushFuture = getLastFlushFuture();
+ if (flushFuture != null) {
+ flushFuture.get();
+ }
}
@Override
@@ -135,10 +131,7 @@ public class RatisBlockOutputStream extends
BlockOutputStream
@Override
public void hsync() throws IOException {
if (!isClosed()) {
- if (getBufferPool() != null && getBufferPool().getSize() > 0) {
- handleFlush(false);
- }
- waitForFlushAndCommit(false);
+ handleFlush(false);
}
}
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
index b56c503df9..e92b96ebce 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
@@ -20,38 +20,128 @@ package org.apache.hadoop.hdds.scm.storage;
import org.apache.hadoop.ozone.common.ChunkBuffer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.slf4j.event.Level;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test for {@link BufferPool}.
*/
class TestBufferPool {
+ @BeforeAll
+ static void init() {
+ GenericTestUtils.setLogLevel(BufferPool.LOG, Level.DEBUG);
+ }
+
@Test
- void testBufferPool() {
+ void testBufferPool() throws Exception {
testBufferPool(BufferPool.empty());
testBufferPool(1, 1);
testBufferPool(3, 1 << 20);
testBufferPool(10, 1 << 10);
}
- private static void testBufferPool(final int capacity, final int bufferSize)
{
+ @Test
+ void testBufferPoolConcurrently() throws Exception {
+ final BufferPool pool = new BufferPool(1 << 20, 10);
+ final Deque<ChunkBuffer> buffers = assertAllocate(pool);
+
+ assertAllocationBlocked(pool);
+ assertAllocationBlockedUntilReleased(pool, buffers);
+ }
+
+ private void assertAllocationBlockedUntilReleased(BufferPool pool,
Deque<ChunkBuffer> buffers) throws Exception {
+ // As the pool is full, allocation will need to wait until a buffer is
released.
+ assertFull(pool);
+
+ LogCapturer logCapturer = LogCapturer.captureLogs(BufferPool.LOG);
+ AtomicReference<ChunkBuffer> allocated = new AtomicReference<>();
+ AtomicBoolean allocatorStarted = new AtomicBoolean();
+ Thread allocator = new Thread(() -> {
+ try {
+ allocatorStarted.set(true);
+ allocated.set(pool.allocateBuffer(0));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ ChunkBuffer toRelease = buffers.removeFirst();
+ Thread releaser = new Thread(() -> pool.releaseBuffer(toRelease));
+
+ allocator.start();
+ // ensure allocator has already started.
+ while (!allocatorStarted.get()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ releaser.start();
+ allocator.join();
+ assertEquals(toRelease, allocated.get());
+ assertTrue(logCapturer.getOutput().contains("Allocation needs to wait the
pool is at capacity"));
+ }
+
+ private void assertAllocationBlocked(BufferPool pool) throws Exception {
+ // As the pool is full, new allocation will be blocked interruptably if no
allocated buffer is released.
+ assertFull(pool);
+
+ LogCapturer logCapturer = LogCapturer.captureLogs(BufferPool.LOG);
+ AtomicBoolean allocatorStarted = new AtomicBoolean();
+ AtomicBoolean interrupted = new AtomicBoolean(false);
+ Thread allocator = new Thread(() -> {
+ try {
+ allocatorStarted.set(true);
+ pool.allocateBuffer(0);
+ } catch (InterruptedException e) {
+ interrupted.set(true);
+ }
+ });
+
+ allocator.start();
+ // ensure allocator has already started.
+ while (!allocatorStarted.get()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Now the allocator is stuck because pool is full and no one releases.
+ // And it can be interrupted.
+ allocator.interrupt();
+ allocator.join();
+ assertTrue(interrupted.get());
+ assertTrue(logCapturer.getOutput().contains("Allocation needs to wait the
pool is at capacity"));
+ }
+
+ private static void testBufferPool(final int capacity, final int bufferSize)
throws Exception {
final BufferPool pool = new BufferPool(bufferSize, capacity);
assertEquals(capacity, pool.getCapacity());
assertEquals(bufferSize, pool.getBufferSize());
testBufferPool(pool);
}
- private static void testBufferPool(final BufferPool pool) {
+ private static void testBufferPool(final BufferPool pool) throws Exception {
assertEmpty(pool);
final Deque<ChunkBuffer> buffers = assertAllocate(pool);
assertFull(pool);
@@ -62,10 +152,10 @@ class TestBufferPool {
private static void assertEmpty(BufferPool pool) {
assertEquals(0, pool.getNumberOfUsedBuffers());
assertEquals(0, pool.getSize());
- assertEquals(-1, pool.getCurrentBufferIndex());
+ assertNull(pool.getCurrentBuffer());
}
- private static Deque<ChunkBuffer> assertAllocate(BufferPool pool) {
+ private static Deque<ChunkBuffer> assertAllocate(BufferPool pool) throws
Exception {
final int capacity = pool.getCapacity();
final int size = pool.getBufferSize();
final Deque<ChunkBuffer> buffers = new LinkedList<>();
@@ -96,19 +186,13 @@ class TestBufferPool {
final int capacity = pool.getCapacity();
assertEquals(capacity, pool.getSize());
assertEquals(capacity, pool.getNumberOfUsedBuffers());
- assertThrows(IllegalStateException.class, () -> pool.allocateBuffer(0));
}
- // buffers are released and reallocated FIFO
+ // buffers are released and reallocated
private static void assertReallocate(BufferPool pool,
- Deque<ChunkBuffer> buffers) {
+ Deque<ChunkBuffer> buffers) throws Exception {
final int capacity = pool.getCapacity();
for (int i = 0; i < 3 * capacity; i++) {
- if (capacity > 1) {
- assertThrows(IllegalStateException.class,
- () -> pool.releaseBuffer(buffers.getLast()));
- }
-
final ChunkBuffer released = buffers.removeFirst();
pool.releaseBuffer(released);
assertEquals(0, released.position());
@@ -122,18 +206,20 @@ class TestBufferPool {
}
}
- private static void assertRelease(BufferPool pool,
- Deque<ChunkBuffer> buffers) {
+ private static void assertRelease(BufferPool pool, Deque<ChunkBuffer>
buffers) {
+ // assert that allocated buffers can be released in any order.
final int capacity = pool.getCapacity();
+ boolean pickFirst = false;
for (int i = capacity - 1; i >= 0; i--) {
- final ChunkBuffer released = buffers.removeFirst();
+ final ChunkBuffer released = pickFirst ? buffers.removeFirst() :
buffers.removeLast();
+ pickFirst = !pickFirst;
pool.releaseBuffer(released);
assertEquals(i, pool.getNumberOfUsedBuffers());
assertThrows(IllegalStateException.class,
() -> pool.releaseBuffer(released));
}
- pool.checkBufferPoolEmpty();
+ assertEquals(0, pool.computeBufferData());
}
private static void fill(ChunkBuffer buf) {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index fe2ee5fa8a..36c16e92bf 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.UUID;
import java.util.function.Function;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -34,6 +35,7 @@ import org.apache.ratis.util.UncheckedAutoCloseable;
final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
private final ByteBuffer buffer;
private final UncheckedAutoCloseable underlying;
+ private final UUID identity = UUID.randomUUID();
ChunkBufferImplWithByteBuffer(ByteBuffer buffer) {
this(buffer, null);
@@ -161,6 +163,6 @@ final class ChunkBufferImplWithByteBuffer implements
ChunkBuffer {
@Override
public String toString() {
return getClass().getSimpleName() + ":limit=" + buffer.limit()
- + "@" + Integer.toHexString(hashCode());
+ + "@" + identity;
}
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index bb4e3e3f08..c25b4508e9 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -171,7 +171,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
.build();
}
- private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+ private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
Preconditions.checkNotNull(subKeyInfo.getPipeline());
streamEntries.add(createStreamEntry(subKeyInfo));
}
@@ -248,7 +248,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
* @param containerID id of the closed container
* @param pipelineId id of the associated pipeline
*/
- void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) {
+ synchronized void discardPreallocatedBlocks(long containerID, PipelineID
pipelineId) {
// currentStreamIndex < streamEntries.size() signifies that, there are
still
// pre allocated blocks available.
@@ -283,7 +283,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
return keyArgs.getKeyName();
}
- long getKeyLength() {
+ synchronized long getKeyLength() {
return streamEntries.stream()
.mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
}
@@ -339,10 +339,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
void hsyncKey(long offset) throws IOException {
if (keyArgs != null) {
// in test, this could be null
- long length = getKeyLength();
- Preconditions.checkArgument(offset == length,
- "Expected offset: " + offset + " expected len: " + length);
- keyArgs.setDataSize(length);
+ keyArgs.setDataSize(offset);
keyArgs.setLocationInfoList(getLocationInfoList());
// When the key is multipart upload part file upload, we should not
// commit the key, as this is not an actual key, this is a just a
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index e080ea2d34..1160d9667a 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -90,7 +90,7 @@ public class KeyOutputStream extends OutputStream
// how much of data is actually written yet to underlying stream
private long offset;
// how much data has been ingested into the stream
- private long writeOffset;
+ private volatile long writeOffset;
// whether an exception is encountered while write and whole write could
// not succeed
private boolean isException;
@@ -195,7 +195,7 @@ public class KeyOutputStream extends OutputStream
* @throws IOException
*/
@Override
- public synchronized void write(byte[] b, int off, int len)
+ public void write(byte[] b, int off, int len)
throws IOException {
checkNotClosed();
if (b == null) {
@@ -208,8 +208,10 @@ public class KeyOutputStream extends OutputStream
if (len == 0) {
return;
}
- handleWrite(b, off, len, false);
- writeOffset += len;
+ synchronized (this) {
+ handleWrite(b, off, len, false);
+ writeOffset += len;
+ }
}
private void handleWrite(byte[] b, int off, long len, boolean retry)
@@ -361,7 +363,7 @@ public class KeyOutputStream extends OutputStream
}
}
- private void markStreamClosed() {
+ private synchronized void markStreamClosed() {
blockOutputStreamEntryPool.cleanup();
closed = true;
}
@@ -435,7 +437,7 @@ public class KeyOutputStream extends OutputStream
}
@Override
- public synchronized void flush() throws IOException {
+ public void flush() throws IOException {
checkNotClosed();
handleFlushOrClose(StreamAction.FLUSH);
}
@@ -446,7 +448,7 @@ public class KeyOutputStream extends OutputStream
}
@Override
- public synchronized void hsync() throws IOException {
+ public void hsync() throws IOException {
if (replication.getReplicationType() != ReplicationType.RATIS) {
throw new UnsupportedOperationException(
"Replication type is not " + ReplicationType.RATIS);
@@ -460,11 +462,12 @@ public class KeyOutputStream extends OutputStream
handleFlushOrClose(StreamAction.HSYNC);
- Preconditions.checkState(offset >= hsyncPos,
- "offset = %s < hsyncPos = %s", offset, hsyncPos);
-
- MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
- () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
+ synchronized (this) {
+ Preconditions.checkState(offset >= hsyncPos,
+ "offset = %s < hsyncPos = %s", offset, hsyncPos);
+ MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
+ () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
+ }
}
/**
@@ -729,7 +732,7 @@ public class KeyOutputStream extends OutputStream
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
- private void checkNotClosed() throws IOException {
+ private synchronized void checkNotClosed() throws IOException {
if (closed) {
throw new IOException(
": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 13cae7bff9..2aa6a71d51 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
@@ -44,6 +45,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -153,7 +155,7 @@ public class TestHSync {
private static final BucketLayout BUCKET_LAYOUT =
BucketLayout.FILE_SYSTEM_OPTIMIZED;
private static final int CHUNK_SIZE = 4 << 12;
- private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+ private static final int FLUSH_SIZE = 3 * CHUNK_SIZE;
private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
private static final int SERVICE_INTERVAL = 100;
@@ -209,6 +211,8 @@ public class TestHSync {
GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
GenericTestUtils.setLogLevel(KeyValueHandler.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(BufferPool.LOG, Level.DEBUG);
+
openKeyCleanupService =
(OpenKeyCleanupService)
cluster.getOzoneManager().getKeyManager().getOpenKeyCleanupService();
openKeyCleanupService.suspend();
@@ -627,45 +631,58 @@ public class TestHSync {
out.writeAndHsync(data);
final byte[] buffer = new byte[4 << 10];
- int offset = 0;
- try (FSDataInputStream in = fs.open(file)) {
- final long skipped = in.skip(length);
- assertEquals(length, skipped);
-
- for (; ;) {
- final int n = in.read(buffer, 0, buffer.length);
- if (n <= 0) {
- break;
+ AtomicInteger attempted = new AtomicInteger();
+
+ // hsync returns when all datanodes acknowledge they have received the
PutBlock request. Yet, the PutBlock
+ // may not been applied to their database. So, an immediate read right
after hsync may not see the synced data
+ // yet. This does not violate hsync guarantee: it ensures all current
pending data becomes persistent.
+ GenericTestUtils.waitFor(() -> {
+ attempted.addAndGet(1);
+ int offset = 0;
+ try (FSDataInputStream in = fs.open(file)) {
+ final long skipped = in.skip(length);
+ assertEquals(length, skipped);
+
+ for (; ;) {
+ final int n = in.read(buffer, 0, buffer.length);
+ if (n <= 0) {
+ break;
+ }
+ for (int i = 0; i < n; i++) {
+ assertEquals(data[offset + i], buffer[i], "expected at offset " +
offset + " i=" + i);
+ }
+ offset += n;
}
- for (int i = 0; i < n; i++) {
- assertEquals(data[offset + i], buffer[i],
- "expected at offset " + offset + " i=" + i);
+ if (offset != data.length) {
+ LOG.error("Read attempt #{} failed. offset {}, expected data.length
{}", attempted, offset, data.length);
+ return false;
+ } else {
+ LOG.debug("Read attempt #{} succeeded. offset {}, expected
data.length {}", attempted, offset, data.length);
+ return true;
}
- offset += n;
+ } catch (IOException e) {
+ LOG.error("Exception is thrown during read", e);
+ return false;
}
- }
- assertEquals(data.length, offset);
+ }, 500, 3000);
}
- private void runConcurrentWriteHSync(Path file,
- final FSDataOutputStream out, int initialDataSize)
- throws InterruptedException, IOException {
- final byte[] data = new byte[initialDataSize];
- ThreadLocalRandom.current().nextBytes(data);
-
- AtomicReference<IOException> writerException = new AtomicReference<>();
- AtomicReference<IOException> syncerException = new AtomicReference<>();
+ private int runConcurrentWriteHSync(Path file,
+ final FSDataOutputStream out, byte[] data, int syncThreadsCount) throws
Exception {
- LOG.info("runConcurrentWriteHSync {} with size {}",
- file, initialDataSize);
+ AtomicReference<Exception> writerException = new AtomicReference<>();
+ AtomicReference<Exception> syncerException = new AtomicReference<>();
+ LOG.info("runConcurrentWriteHSync {} with size {}", file, data.length);
+ AtomicInteger writes = new AtomicInteger();
final long start = Time.monotonicNow();
// two threads: write and hsync
Runnable writer = () -> {
while ((Time.monotonicNow() - start < 10000)) {
try {
out.write(data);
- } catch (IOException e) {
+ writes.incrementAndGet();
+ } catch (Exception e) {
writerException.set(e);
throw new RuntimeException(e);
}
@@ -676,7 +693,7 @@ public class TestHSync {
while ((Time.monotonicNow() - start < 10000)) {
try {
out.hsync();
- } catch (IOException e) {
+ } catch (Exception e) {
syncerException.set(e);
throw new RuntimeException(e);
}
@@ -684,11 +701,19 @@ public class TestHSync {
};
Thread writerThread = new Thread(writer);
+ writerThread.setName("Writer");
writerThread.start();
- Thread syncThread = new Thread(syncer);
- syncThread.start();
+ Thread[] syncThreads = new Thread[syncThreadsCount];
+ for (int i = 0; i < syncThreadsCount; i++) {
+ syncThreads[i] = new Thread(syncer);
+ syncThreads[i].setName("Syncer-" + i);
+ syncThreads[i].start();
+ }
+
writerThread.join();
- syncThread.join();
+ for (Thread sync : syncThreads) {
+ sync.join();
+ }
if (writerException.get() != null) {
throw writerException.get();
@@ -696,29 +721,54 @@ public class TestHSync {
if (syncerException.get() != null) {
throw syncerException.get();
}
+ return writes.get();
}
- @Test
- public void testConcurrentWriteHSync()
- throws IOException, InterruptedException {
- final String rootPath = String.format("%s://%s/",
- OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+ public static Stream<Arguments> concurrentWriteHSync() {
+ return Stream.of(
+ Arguments.of(1, 1, true),
+ Arguments.of(2, 1, true),
+ Arguments.of(8, 2, true),
+ Arguments.of(8, 8, true),
+ Arguments.of(8, 16, true),
+ Arguments.of(1, 1, false),
+ Arguments.of(8, 2, false),
+ Arguments.of(8, 16, false)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("concurrentWriteHSync")
+ public void testConcurrentWriteHSync(int writeSampleSize, int
syncThreadsCount,
+ boolean piggybackingEnabled) throws Exception {
+ final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME,
CONF.get(OZONE_OM_ADDRESS_KEY));
+ CONF.setBoolean("ozone.client.stream.putblock.piggybacking",
piggybackingEnabled);
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
- final String dir = OZONE_ROOT + bucket.getVolumeName()
- + OZONE_URI_DELIMITER + bucket.getName();
+ final String dir = OZONE_ROOT + bucket.getVolumeName() +
OZONE_URI_DELIMITER + bucket.getName();
try (FileSystem fs = FileSystem.get(CONF)) {
- for (int i = 0; i < 5; i++) {
- final Path file = new Path(dir, "file" + i);
- try (FSDataOutputStream out =
- fs.create(file, true)) {
- int initialDataSize = 1 << i;
- runConcurrentWriteHSync(file, out, initialDataSize);
- }
+ final Path file = new Path(dir, "file" + writeSampleSize);
+ byte[] data = new byte[writeSampleSize];
+ ThreadLocalRandom.current().nextBytes(data);
+ int writes;
+ try (FSDataOutputStream out = fs.create(file, true)) {
+ writes = runConcurrentWriteHSync(file, out, data, syncThreadsCount);
+ }
+ validateWrittenFile(file, fs, data, writes);
+ fs.delete(file, false);
+ }
+ }
- fs.delete(file, false);
+ private void validateWrittenFile(Path file, FileSystem fs, byte[] expected,
int times) throws IOException {
+ try (FSDataInputStream is = fs.open(file)) {
+ byte[] actual = new byte[expected.length];
+ for (int i = 0; i < times; i++) {
+ assertEquals(expected.length, is.read(actual));
+ assertArrayEquals(expected, actual);
}
+ // ensure nothing more can be read.
+ assertEquals(-1, is.read(expected));
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
index c3ea911f19..17ca07cd52 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
@@ -209,7 +209,6 @@ public class TestCommitWatcher {
return v;
});
futures.add(future);
- watcher.putFlushFuture(length, future);
replies.add(reply);
}
@@ -220,21 +219,15 @@ public class TestCommitWatcher {
CompletableFuture<ContainerCommandResponseProto> future2 =
futures.get(1);
future1.get();
- assertEquals(future1, watcher.getFutureMap().get((long) chunkSize));
- // wait on 2nd putBlock to complete
future2.get();
- assertEquals(future2, watcher.getFutureMap().get((long) 2 *
chunkSize));
assertEquals(2, watcher.
getCommitIndexMap().size());
watcher.watchOnFirstIndex();
assertThat(watcher.getCommitIndexMap()).doesNotContainKey(replies.get(0).getLogIndex());
- assertThat(watcher.getFutureMap()).doesNotContainKey((long) chunkSize);
assertThat(watcher.getTotalAckDataLength()).isGreaterThanOrEqualTo(chunkSize);
watcher.watchOnLastIndex();
assertThat(watcher.getCommitIndexMap()).doesNotContainKey(replies.get(1).getLogIndex());
- assertThat(watcher.getFutureMap()).doesNotContainKey((long) 2 *
chunkSize);
assertEquals(2 * chunkSize, watcher.getTotalAckDataLength());
- assertThat(watcher.getFutureMap()).isEmpty();
assertThat(watcher.getCommitIndexMap()).isEmpty();
}
} finally {
@@ -282,7 +275,6 @@ public class TestCommitWatcher {
return v;
});
futures.add(future);
- watcher.putFlushFuture(length, future);
replies.add(reply);
}
@@ -293,14 +285,11 @@ public class TestCommitWatcher {
CompletableFuture<ContainerCommandResponseProto> future2 =
futures.get(1);
future1.get();
- assertEquals(future1, watcher.getFutureMap().get((long) chunkSize));
// wait on 2nd putBlock to complete
future2.get();
- assertEquals(future2, watcher.getFutureMap().get((long) 2 *
chunkSize));
assertEquals(2, watcher.getCommitIndexMap().size());
watcher.watchOnFirstIndex();
assertThat(watcher.getCommitIndexMap()).doesNotContainKey(replies.get(0).getLogIndex());
- assertThat(watcher.getFutureMap()).doesNotContainKey((long) chunkSize);
assertThat(watcher.getTotalAckDataLength()).isGreaterThanOrEqualTo(chunkSize);
cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
@@ -325,10 +314,8 @@ public class TestCommitWatcher {
.getLogIndex()) {
assertEquals(chunkSize, watcher.getTotalAckDataLength());
assertEquals(1, watcher.getCommitIndexMap().size());
- assertEquals(1, watcher.getFutureMap().size());
} else {
assertEquals(2 * chunkSize, watcher.getTotalAckDataLength());
- assertThat(watcher.getFutureMap()).isEmpty();
assertThat(watcher.getCommitIndexMap()).isEmpty();
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 67736033c5..2a6b2246b9 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -235,7 +235,6 @@ class TestBlockOutputStream {
// in the buffer, with only one buffer being allocated in the buffer pool
assertEquals(1, bufferPool.getSize());
- assertEquals(0, bufferPool.getBuffer(0).position());
assertEquals(totalWriteLength, blockOutputStream.getWrittenDataLength());
assertEquals(totalWriteLength,
blockOutputStream.getTotalDataFlushedLength());
@@ -587,20 +586,23 @@ class TestBlockOutputStream {
byte[] data1 = RandomUtils.nextBytes(dataLength);
key.write(data1);
- // since its hitting the full bufferCondition, it will call
watchForCommit
- // and completes atleast putBlock for first flushSize worth of data
- assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
- .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
- assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
- .isLessThanOrEqualTo(pendingPutBlockCount + 1);
KeyOutputStream keyOutputStream =
assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
- assertEquals(1, keyOutputStream.getStreamEntries().size());
RatisBlockOutputStream blockOutputStream =
assertInstanceOf(RatisBlockOutputStream.class,
keyOutputStream.getStreamEntries().get(0).getOutputStream());
+ BufferPool bufferPool = blockOutputStream.getBufferPool();
+ // since it's hitting the full bufferCondition, it will call
watchForCommit
+ // however, the outputstream will not wait for watchForCommit, but the
next call to
+ // write() will need to wait for at least one watchForCommit, indirectly
when asking for new buffer allocation.
+ bufferPool.waitUntilAvailable();
+ assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+ assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+ .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+
+ assertEquals(1, keyOutputStream.getStreamEntries().size());
assertEquals(4, blockOutputStream.getBufferPool().getSize());
// writtenDataLength as well flushedDataLength will be updated here
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]