smengcl commented on code in PR #5900:
URL: https://github.com/apache/ozone/pull/5900#discussion_r1446337159


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new 
CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 
2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();
+    private int awaitCount;
+    private int flushCount;
+
+    synchronized CompletableFuture<Integer> await() {
+      awaitCount++;
+      final int flush = flushCount + 2;
+      LOG.debug("await flush {}", flush);
+      final Entry entry = flushFutures.computeIfAbsent(flush, key -> new 
Entry());
+      Preconditions.checkState(flushFutures.size() <= 2);
+      return entry.await();
+    }
+
+    synchronized int notifyFlush() {
+      final int await = awaitCount;
+      final int flush = ++flushCount;
+      awaitCount -= Optional.ofNullable(flushFutures.remove(flush))
+          .map(Entry::complete)
+          .orElse(0);

Review Comment:
   Use of `Optional` does make the expression elegant here. But do note 
`Optional.ofNullable()` creates new object inside whenever `value` is not null 
every time:
   
   ```java
   public static <T> Optional<T> of(T value) {
       return new Optional<>(value);
   }
   ```
   
   Do we want to avoid the object creation here as well?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new 
CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 
2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();

Review Comment:
   Shall we use a `ConcurrentHashMap` here instead?
   
   `TreeMap` doesn't seem to guarantee thread-safety for `remove()` or 
`computeIfAbsent()`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new 
CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));

Review Comment:
   Just curious, in which case would `future.complete(count)` ever return 
`false` here?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,29 +649,57 @@ void resume() {
     isRunning.set(true);
   }
 
+  CompletableFuture<Integer> awaitFlushAsync() {
+    return flushNotifier.await();
+  }
+
   public void awaitFlush() throws InterruptedException {
-    flushNotifier.await();
+    try {
+      awaitFlushAsync().get();
+    } catch (ExecutionException e) {
+      // the future will never be completed exceptionally.
+      throw new IllegalStateException(e);
+    }
   }
 
   static class FlushNotifier {
-    private final Set<CountDownLatch> flushLatches =
-        ConcurrentHashMap.newKeySet();
+    static class Entry {
+      private final CompletableFuture<Integer> future = new 
CompletableFuture<>();
+      private int count;
 
-    void await() throws InterruptedException {
+      private CompletableFuture<Integer> await() {
+        count++;
+        return future;
+      }
 
-      // Wait until both the current and ready buffers are flushed.
-      CountDownLatch latch = new CountDownLatch(2);
-      flushLatches.add(latch);
-      latch.await();
-      flushLatches.remove(latch);
+      private int complete() {
+        Preconditions.checkState(future.complete(count));
+        return future.join();
+      }
     }
 
-    int notifyFlush() {
-      int retval = flushLatches.size();
-      for (CountDownLatch l : flushLatches) {
-        l.countDown();
-      }
-      return retval;
+    /** The size of the map is at most two since it uses {@link #flushCount} + 
2 in {@link #await()} .*/
+    private final Map<Integer, Entry> flushFutures = new TreeMap<>();
+    private int awaitCount;
+    private int flushCount;

Review Comment:
   We might need `volatile` keyword for both? Otherwise `++flushCount` in one 
thread may not be visible to others?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to