szetszwo commented on code in PR #6968:
URL: https://github.com/apache/ozone/pull/6968#discussion_r1685324770
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -364,6 +368,13 @@ private void doFlushOrWatchIfNeeded() throws IOException {
}
}
+ private void recordFlushFuture(CompletableFuture<Void> flushFuture) {
+ Preconditions.checkState(Thread.holdsLock(this));
+ this.lastFlushFuture = flushFuture;
+ allFlushFutures.add(flushFuture);
+ flushFuture.whenComplete((r, e) -> allFlushFutures.remove(flushFuture));
Review Comment:
(2) Use `thenCombine`
```java
this.lastFlushFuture = lastFlushFuture.thenCombine(flushFuture, (last,
cur) -> null);
```
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -150,6 +153,7 @@ public class BlockOutputStream extends OutputStream {
private boolean allowPutBlockPiggybacking;
private CompletableFuture<Void> lastFlushFuture;
+ private Set<CompletableFuture<Void>> allFlushFutures =
Collections.newSetFromMap(new ConcurrentHashMap<>());
Review Comment:
We may simply chain the futures instead of adding a set.
(1) Initialize a completed future.
```java
private CompletableFuture<Void> lastFlushFuture =
CompletableFuture.completedFuture(null);
```
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -682,7 +700,7 @@ private synchronized CompletableFuture<Void>
handleFlushInternalSynchronized(boo
LOG.debug("Flushing without data");
}
if (putBlockResultFuture != null) {
- this.lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
+ recordFlushFuture(watchForCommitAsync(putBlockResultFuture));
Review Comment:
https://github.com/apache/ozone/blob/f3b90fb6dfc035e326bd39a289ff0696849b5989/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java#L428
The one in `writeOnRetry(..)` shown above also needs a similar change.
BTW, since we will always has `recordFlushFuture(watchForCommitAsync(..))`
together. How about combining these two methods?
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java:
##########
@@ -640,6 +651,13 @@ private void handleFlushInternal(boolean close)
}
LOG.debug("Flush done.");
}
+
+ if (close) {
+ // When closing, must wait for all flush futures to complete.
+ for (CompletableFuture<Void> flushFuture : allFlushFutures) {
+ flushFuture.get();
+ }
Review Comment:
(3) just call `lastFlushFuture.get()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]