This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 890c48dd7e HDDS-11193. KeyOutputStream flakiness when running write 
and hsync concurrently (#6968)
890c48dd7e is described below

commit 890c48dd7e04f749eefe64aee82a8b08a8437236
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Jul 24 10:11:50 2024 -0700

    HDDS-11193. KeyOutputStream flakiness when running write and hsync 
concurrently (#6968)
---
 .../hdds/scm/storage/AbstractCommitWatcher.java    |  3 +--
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 22 +++++++++++++++++++---
 .../client/io/BlockOutputStreamEntryPool.java      |  7 ++++---
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  2 +-
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java |  8 +++++---
 5 files changed, 30 insertions(+), 12 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index 0c6b929344..2bc73ce58f 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -141,8 +141,7 @@ abstract class AbstractCommitWatcher<BUFFER> {
     try {
       final XceiverClientReply reply = client.watchForCommit(commitIndex);
       f.complete(reply);
-      final CompletableFuture<XceiverClientReply> removed
-          = replies.remove(commitIndex);
+      final CompletableFuture<XceiverClientReply> removed = 
replies.remove(commitIndex);
       Preconditions.checkState(removed == f);
 
       final long index = reply != null ? reply.getLogIndex() : 0;
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 0e8a0d56fe..da61000677 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -150,6 +150,7 @@ public class BlockOutputStream extends OutputStream {
   private boolean allowPutBlockPiggybacking;
 
   private CompletableFuture<Void> lastFlushFuture;
+  private CompletableFuture<Void> allPendingFlushFutures = 
CompletableFuture.completedFuture(null);
 
   /**
    * Creates a new BlockOutputStream.
@@ -355,7 +356,7 @@ public class BlockOutputStream extends OutputStream {
       if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
         updatePutBlockLength();
         CompletableFuture<PutBlockResult> putBlockFuture = 
executePutBlock(false, false);
-        this.lastFlushFuture = watchForCommitAsync(putBlockFuture);
+        recordWatchForCommitAsync(putBlockFuture);
       }
 
       if (bufferPool.isAtCapacity()) {
@@ -364,6 +365,16 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
+  private void recordWatchForCommitAsync(CompletableFuture<PutBlockResult> 
putBlockResultFuture) {
+    recordFlushFuture(watchForCommitAsync(putBlockResultFuture));
+  }
+
+  private void recordFlushFuture(CompletableFuture<Void> flushFuture) {
+    Preconditions.checkState(Thread.holdsLock(this));
+    this.lastFlushFuture = flushFuture;
+    this.allPendingFlushFutures = 
allPendingFlushFutures.thenCombine(flushFuture, (last, curr) -> null);
+  }
+
   private void allocateNewBufferIfNeeded() throws IOException {
     if (currentBufferRemaining == 0) {
       try {
@@ -425,7 +436,7 @@ public class BlockOutputStream extends OutputStream {
         updateWriteChunkLength();
         updatePutBlockLength();
         CompletableFuture<PutBlockResult> putBlockResultFuture = 
executePutBlock(false, false);
-        lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
+        recordWatchForCommitAsync(putBlockResultFuture);
       }
       if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
         handleFullBuffer();
@@ -640,6 +651,11 @@ public class BlockOutputStream extends OutputStream {
       }
       LOG.debug("Flush done.");
     }
+
+    if (close) {
+      // When closing, must wait for all flush futures to complete.
+      allPendingFlushFutures.get();
+    }
   }
 
   private synchronized CompletableFuture<Void> 
handleFlushInternalSynchronized(boolean close) throws IOException {
@@ -682,7 +698,7 @@ public class BlockOutputStream extends OutputStream {
       LOG.debug("Flushing without data");
     }
     if (putBlockResultFuture != null) {
-      this.lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
+      recordWatchForCommitAsync(putBlockResultFuture);
     }
     return lastFlushFuture;
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index c25b4508e9..9a470a72fd 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -136,7 +136,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
    * @param version the set of blocks that are pre-allocated.
    * @param openVersion the version corresponding to the pre-allocation.
    */
-  public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long 
openVersion) {
+  public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup 
version, long openVersion) {
     // server may return any number of blocks, (0 to any)
     // only the blocks allocated in this open session (block createVersion
     // equals to open session version)
@@ -379,7 +379,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
    * @return the new current open stream to write to
    * @throws IOException if the block allocation failed.
    */
-  BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
+  synchronized BlockOutputStreamEntry allocateBlockIfNeeded() throws 
IOException {
     BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
     if (streamEntry != null && streamEntry.isClosed()) {
       // a stream entry gets closed either by :
@@ -395,7 +395,8 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
     }
     // in theory, this condition should never violate due the check above
     // still do a sanity check.
-    Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+    Preconditions.checkArgument(currentStreamIndex < streamEntries.size(),
+        "currentStreamIndex(%s) must be < streamEntries.size(%s)", 
currentStreamIndex, streamEntries.size());
     return streamEntries.get(currentStreamIndex);
   }
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 1160d9667a..649c2e7dcc 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -287,7 +287,7 @@ public class KeyOutputStream extends OutputStream
    * @param exception   actual exception that occurred
    * @throws IOException Throws IOException if Write fails
    */
-  private void handleException(BlockOutputStreamEntry streamEntry,
+  private synchronized void handleException(BlockOutputStreamEntry streamEntry,
       IOException exception) throws IOException {
     Throwable t = HddsClientUtils.checkForException(exception);
     Preconditions.checkNotNull(t);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 2aa6a71d51..56d38f9f02 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -683,6 +683,7 @@ public class TestHSync {
           out.write(data);
           writes.incrementAndGet();
         } catch (Exception e) {
+          LOG.error("Error writing", e);
           writerException.set(e);
           throw new RuntimeException(e);
         }
@@ -694,6 +695,7 @@ public class TestHSync {
         try {
           out.hsync();
         } catch (Exception e) {
+          LOG.error("Error calling hsync", e);
           syncerException.set(e);
           throw new RuntimeException(e);
         }
@@ -715,12 +717,12 @@ public class TestHSync {
       sync.join();
     }
 
-    if (writerException.get() != null) {
-      throw writerException.get();
-    }
     if (syncerException.get() != null) {
       throw syncerException.get();
     }
+    if (writerException.get() != null) {
+      throw writerException.get();
+    }
     return writes.get();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to