This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 890c48dd7e HDDS-11193. KeyOutputStream flakiness when running write
and hsync concurrently (#6968)
890c48dd7e is described below
commit 890c48dd7e04f749eefe64aee82a8b08a8437236
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Jul 24 10:11:50 2024 -0700
HDDS-11193. KeyOutputStream flakiness when running write and hsync
concurrently (#6968)
---
.../hdds/scm/storage/AbstractCommitWatcher.java | 3 +--
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 22 +++++++++++++++++++---
.../client/io/BlockOutputStreamEntryPool.java | 7 ++++---
.../hadoop/ozone/client/io/KeyOutputStream.java | 2 +-
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 8 +++++---
5 files changed, 30 insertions(+), 12 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index 0c6b929344..2bc73ce58f 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -141,8 +141,7 @@ abstract class AbstractCommitWatcher<BUFFER> {
try {
final XceiverClientReply reply = client.watchForCommit(commitIndex);
f.complete(reply);
- final CompletableFuture<XceiverClientReply> removed
- = replies.remove(commitIndex);
+ final CompletableFuture<XceiverClientReply> removed =
replies.remove(commitIndex);
Preconditions.checkState(removed == f);
final long index = reply != null ? reply.getLogIndex() : 0;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 0e8a0d56fe..da61000677 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -150,6 +150,7 @@ public class BlockOutputStream extends OutputStream {
private boolean allowPutBlockPiggybacking;
private CompletableFuture<Void> lastFlushFuture;
+ private CompletableFuture<Void> allPendingFlushFutures =
CompletableFuture.completedFuture(null);
/**
* Creates a new BlockOutputStream.
@@ -355,7 +356,7 @@ public class BlockOutputStream extends OutputStream {
if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
updatePutBlockLength();
CompletableFuture<PutBlockResult> putBlockFuture =
executePutBlock(false, false);
- this.lastFlushFuture = watchForCommitAsync(putBlockFuture);
+ recordWatchForCommitAsync(putBlockFuture);
}
if (bufferPool.isAtCapacity()) {
@@ -364,6 +365,16 @@ public class BlockOutputStream extends OutputStream {
}
}
+ private void recordWatchForCommitAsync(CompletableFuture<PutBlockResult>
putBlockResultFuture) {
+ recordFlushFuture(watchForCommitAsync(putBlockResultFuture));
+ }
+
+ private void recordFlushFuture(CompletableFuture<Void> flushFuture) {
+ Preconditions.checkState(Thread.holdsLock(this));
+ this.lastFlushFuture = flushFuture;
+ this.allPendingFlushFutures =
allPendingFlushFutures.thenCombine(flushFuture, (last, curr) -> null);
+ }
+
private void allocateNewBufferIfNeeded() throws IOException {
if (currentBufferRemaining == 0) {
try {
@@ -425,7 +436,7 @@ public class BlockOutputStream extends OutputStream {
updateWriteChunkLength();
updatePutBlockLength();
CompletableFuture<PutBlockResult> putBlockResultFuture =
executePutBlock(false, false);
- lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
+ recordWatchForCommitAsync(putBlockResultFuture);
}
if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
handleFullBuffer();
@@ -640,6 +651,11 @@ public class BlockOutputStream extends OutputStream {
}
LOG.debug("Flush done.");
}
+
+ if (close) {
+ // When closing, must wait for all flush futures to complete.
+ allPendingFlushFutures.get();
+ }
}
private synchronized CompletableFuture<Void>
handleFlushInternalSynchronized(boolean close) throws IOException {
@@ -682,7 +698,7 @@ public class BlockOutputStream extends OutputStream {
LOG.debug("Flushing without data");
}
if (putBlockResultFuture != null) {
- this.lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
+ recordWatchForCommitAsync(putBlockResultFuture);
}
return lastFlushFuture;
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index c25b4508e9..9a470a72fd 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -136,7 +136,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
* @param version the set of blocks that are pre-allocated.
* @param openVersion the version corresponding to the pre-allocation.
*/
- public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long
openVersion) {
+ public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup
version, long openVersion) {
// server may return any number of blocks, (0 to any)
// only the blocks allocated in this open session (block createVersion
// equals to open session version)
@@ -379,7 +379,7 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
* @return the new current open stream to write to
* @throws IOException if the block allocation failed.
*/
- BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
+ synchronized BlockOutputStreamEntry allocateBlockIfNeeded() throws
IOException {
BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
if (streamEntry != null && streamEntry.isClosed()) {
// a stream entry gets closed either by :
@@ -395,7 +395,8 @@ public class BlockOutputStreamEntryPool implements
KeyMetadataAware {
}
// in theory, this condition should never violate due the check above
// still do a sanity check.
- Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+ Preconditions.checkArgument(currentStreamIndex < streamEntries.size(),
+ "currentStreamIndex(%s) must be < streamEntries.size(%s)",
currentStreamIndex, streamEntries.size());
return streamEntries.get(currentStreamIndex);
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 1160d9667a..649c2e7dcc 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -287,7 +287,7 @@ public class KeyOutputStream extends OutputStream
* @param exception actual exception that occurred
* @throws IOException Throws IOException if Write fails
*/
- private void handleException(BlockOutputStreamEntry streamEntry,
+ private synchronized void handleException(BlockOutputStreamEntry streamEntry,
IOException exception) throws IOException {
Throwable t = HddsClientUtils.checkForException(exception);
Preconditions.checkNotNull(t);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 2aa6a71d51..56d38f9f02 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -683,6 +683,7 @@ public class TestHSync {
out.write(data);
writes.incrementAndGet();
} catch (Exception e) {
+ LOG.error("Error writing", e);
writerException.set(e);
throw new RuntimeException(e);
}
@@ -694,6 +695,7 @@ public class TestHSync {
try {
out.hsync();
} catch (Exception e) {
+ LOG.error("Error calling hsync", e);
syncerException.set(e);
throw new RuntimeException(e);
}
@@ -715,12 +717,12 @@ public class TestHSync {
sync.join();
}
- if (writerException.get() != null) {
- throw writerException.get();
- }
if (syncerException.get() != null) {
throw syncerException.get();
}
+ if (writerException.get() != null) {
+ throw writerException.get();
+ }
return writes.get();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]