This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 759433c2ec HDDS-8499. Add mechanism to notify threads when OM double 
buffer flushed. (#4621)
759433c2ec is described below

commit 759433c2ece2f48f15d0de3927c20f05ee32458c
Author: GeorgeJahad <[email protected]>
AuthorDate: Fri May 5 14:06:31 2023 -0700

    HDDS-8499. Add mechanism to notify threads when OM double buffer flushed. 
(#4621)
---
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |  74 +++++++++-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |   8 ++
 .../om/ratis/TestOzoneManagerDoubleBuffer.java     | 150 +++++++++++++++++++++
 3 files changed, 227 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 94520e1a97..e2b47292bc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -30,8 +30,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -104,6 +107,7 @@ public final class OzoneManagerDoubleBuffer {
   private final boolean isRatisEnabled;
   private final boolean isTracingEnabled;
   private final Semaphore unFlushedTransactions;
+  private final FlushNotifier flushNotifier;
 
   /**
    * function which will get term associated with the transaction index.
@@ -120,6 +124,7 @@ public final class OzoneManagerDoubleBuffer {
     private boolean isTracingEnabled = false;
     private Function<Long, Long> indexToTerm = null;
     private int maxUnFlushedTransactionCount = 0;
+    private FlushNotifier flushNotifier;
 
     public Builder setOmMetadataManager(OMMetadataManager omm) {
       this.mm = omm;
@@ -152,6 +157,11 @@ public final class OzoneManagerDoubleBuffer {
       return this;
     }
 
+    public Builder setFlushNotifier(FlushNotifier flushNotifier) {
+      this.flushNotifier = flushNotifier;
+      return this;
+    }
+
     public OzoneManagerDoubleBuffer build() {
       if (isRatisEnabled) {
         Preconditions.checkNotNull(rs, "When ratis is enabled, " +
@@ -162,15 +172,21 @@ public final class OzoneManagerDoubleBuffer {
             "when ratis is enable, maxUnFlushedTransactions " +
                 "should be bigger than 0");
       }
+      if (flushNotifier == null) {
+        flushNotifier = new FlushNotifier();
+      }
+
       return new OzoneManagerDoubleBuffer(mm, rs, isRatisEnabled,
-          isTracingEnabled, indexToTerm, maxUnFlushedTransactionCount);
+          isTracingEnabled, indexToTerm, maxUnFlushedTransactionCount,
+          flushNotifier);
     }
   }
 
   private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
       OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
       boolean isRatisEnabled, boolean isTracingEnabled,
-      Function<Long, Long> indexToTerm, int maxUnFlushedTransactions) {
+      Function<Long, Long> indexToTerm, int maxUnFlushedTransactions,
+      FlushNotifier flushNotifier) {
     this.currentBuffer = new ConcurrentLinkedQueue<>();
     this.readyBuffer = new ConcurrentLinkedQueue<>();
     this.isRatisEnabled = isRatisEnabled;
@@ -185,6 +201,7 @@ public final class OzoneManagerDoubleBuffer {
     this.ozoneManagerDoubleBufferMetrics =
         OzoneManagerDoubleBufferMetrics.create();
     this.indexToTerm = indexToTerm;
+    this.flushNotifier = flushNotifier;
 
     isRunning.set(true);
     // Daemon thread which runs in background and flushes transactions to DB.
@@ -254,7 +271,8 @@ public final class OzoneManagerDoubleBuffer {
    * Runs in a background thread and batches the transaction in currentBuffer
    * and commit to DB.
    */
-  private void flushTransactions() {
+  @VisibleForTesting
+  void flushTransactions() {
     while (isRunning.get() && canFlush()) {
       flushCurrentBuffer();
     }
@@ -295,6 +313,7 @@ public final class OzoneManagerDoubleBuffer {
       }
 
       clearReadyBuffer();
+      flushNotifier.notifyFlush();
     } catch (IOException ex) {
       terminate(ex, 1);
     } catch (Throwable t) {
@@ -611,10 +630,14 @@ public final class OzoneManagerDoubleBuffer {
   private synchronized boolean canFlush() {
     try {
       while (currentBuffer.size() == 0) {
-        wait(Long.MAX_VALUE);
+        // canFlush() only gets called when the readyBuffer is empty.
+        // Since both buffers are empty, notify once for each.
+        flushNotifier.notifyFlush();
+        flushNotifier.notifyFlush();
+        wait(1000L);
       }
       return true;
-    }  catch (InterruptedException ex) {
+    } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       if (isRunning.get()) {
         final String message = "OMDoubleBuffer flush thread " +
@@ -649,4 +672,45 @@ public final class OzoneManagerDoubleBuffer {
   public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
     return ozoneManagerDoubleBufferMetrics;
   }
+
+  @VisibleForTesting
+  int getCurrentBufferSize() {
+    return currentBuffer.size();
+  }
+
+  @VisibleForTesting
+  int getReadyBufferSize() {
+    return readyBuffer.size();
+  }
+
+  @VisibleForTesting
+  void resume() {
+    isRunning.set(true);
+  }
+
+  void awaitFlush() throws InterruptedException {
+    flushNotifier.await();
+  }
+
+  static class FlushNotifier {
+    private final Set<CountDownLatch> flushLatches =
+        ConcurrentHashMap.newKeySet();
+
+    void await() throws InterruptedException {
+
+      // Wait until both the current and ready buffers are flushed.
+      CountDownLatch latch = new CountDownLatch(2);
+      flushLatches.add(latch);
+      latch.await();
+      flushLatches.remove(latch);
+    }
+
+    int notifyFlush() {
+      int retval = flushLatches.size();
+      for (CountDownLatch l : flushLatches) {
+        l.countDown();
+      }
+      return retval;
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 12343524c4..d3bf26b772 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -718,4 +718,12 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     return applyTransactionMap.get(transactionIndex);
   }
 
+  /**
+   * Wait until both buffers are flushed.  This is used in cases like
+   * "follower bootstrap tarball creation" where the rocksDb for the active
+   * fs needs to synchronized with the rocksdb's for the snapshots.
+   */
+  public void awaitDoubleBufferFlush() throws InterruptedException {
+    ozoneManagerDoubleBuffer.awaitFlush();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
index 57dbf47e3e..939f5a210e 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
@@ -18,8 +18,14 @@ package org.apache.hadoop.ozone.om.ratis;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Stream;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.audit.AuditLogger;
@@ -38,16 +44,22 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateS
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
 
+import static org.junit.Assert.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 /**
@@ -74,6 +86,8 @@ class TestOzoneManagerDoubleBuffer {
       mock(OMSnapshotCreateResponse.class);
   @TempDir
   private File tempDir;
+  private OzoneManagerDoubleBuffer.FlushNotifier flushNotifier;
+  private OzoneManagerDoubleBuffer.FlushNotifier spyFlushNotifier;
 
   @BeforeEach
   public void setup() throws IOException {
@@ -93,12 +107,15 @@ class TestOzoneManagerDoubleBuffer {
     OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot = index -> {
     };
 
+    flushNotifier = new OzoneManagerDoubleBuffer.FlushNotifier();
+    spyFlushNotifier = spy(flushNotifier);
     doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
         .setOmMetadataManager(omMetadataManager)
         .setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
         .setmaxUnFlushedTransactionCount(1000)
         .enableRatis(true)
         .setIndexToTerm((i) -> 1L)
+        .setFlushNotifier(spyFlushNotifier)
         .build();
 
     doNothing().when(omKeyCreateResponse).checkAndUpdateDB(any(), any());
@@ -215,4 +232,137 @@ class TestOzoneManagerDoubleBuffer {
     assertEquals(expectedAvgFlushTransactionsInMetric,
         bufferMetrics.getAvgFlushTransactionsInOneIteration(), 0.001);
   }
+
+  @Test
+  public void testAwaitFlush()
+      throws ExecutionException, InterruptedException {
+    List<OMClientResponse> omClientResponses =
+        Arrays.asList(omKeyCreateResponse,
+        omBucketCreateResponse);
+    int initialSize = omClientResponses.size();
+    AtomicInteger notifyCounter = new AtomicInteger();
+    ExecutorService executorService = Executors.newCachedThreadPool();
+    int transactionIndex = 0;
+
+    // Stop the daemon to eliminate race conditions.
+    doubleBuffer.stopDaemon();
+
+    // Confirm clear.
+    assertEquals(0, doubleBuffer.getCurrentBufferSize());
+    assertEquals(0, doubleBuffer.getReadyBufferSize());
+
+    // Override notifier to do some assert checks.
+    doAnswer(i -> {
+      notifyCounter.incrementAndGet();
+      assertEquals(0, doubleBuffer.getCurrentBufferSize());
+      assertEquals(0, doubleBuffer.getReadyBufferSize());
+      flushNotifier.notifyFlush();
+      return null;
+    }).when(spyFlushNotifier).notifyFlush();
+
+    // Init double buffer.
+    for (OMClientResponse omClientResponse : omClientResponses) {
+      doubleBuffer.add(omClientResponse, transactionIndex++);
+    }
+    assertEquals(initialSize,
+        doubleBuffer.getCurrentBufferSize());
+
+    // Start double buffer and wait for flush.
+    Future<?> await = awaitFlush(executorService);
+    Future<Boolean> flusher = flushTransactions(executorService);
+    await.get();
+
+    // Make sure notify was called at least twice.
+    assertTrue(notifyCounter.get() >= 2);
+    assertFalse(flusher.isDone());
+
+    // Confirm still empty.
+    assertEquals(0, doubleBuffer.getCurrentBufferSize());
+    assertEquals(0, doubleBuffer.getReadyBufferSize());
+
+    // Run again to make sure it works when double buffer is empty
+    await = awaitFlush(executorService);
+    await.get();
+
+    // Clean up.
+    flusher.cancel(false);
+    assertThrows(java.util.concurrent.CancellationException.class,
+        flusher::get);
+  }
+
+
+  // Return a future that waits for the flush.
+  private Future<Boolean> awaitFlush(ExecutorService executorService) {
+    return executorService.submit(() -> {
+      doubleBuffer.awaitFlush();
+      return true;
+    });
+  }
+
+  private Future<Boolean> flushTransactions(ExecutorService executorService) {
+    return executorService.submit(() -> {
+      doubleBuffer.resume();
+      try {
+        doubleBuffer.flushTransactions();
+      } catch (Exception e) {
+        return false;
+      }
+      return true;
+    });
+  }
+
+  @Test
+  public void testFlushNotifier()
+      throws InterruptedException, ExecutionException {
+
+    OzoneManagerDoubleBuffer.FlushNotifier fn =
+        new OzoneManagerDoubleBuffer.FlushNotifier();
+
+    // Confirm nothing waiting yet.
+    assertEquals(0, fn.notifyFlush());
+    ExecutorService executorService = Executors.newCachedThreadPool();
+    List<Future<Boolean>> tasks = new ArrayList<>();
+
+    // Simulate 3 waiting.
+    for (int i = 0; i < 3; i++) {
+      tasks.add(waitFN(fn, executorService));
+    }
+    Thread.sleep(2000);
+
+    // Confirm not done.
+    for (Future<Boolean> task : tasks) {
+      assertFalse(task.isDone());
+    }
+    assertEquals(3, fn.notifyFlush());
+
+    // Add a fourth.
+    tasks.add(waitFN(fn, executorService));
+    Thread.sleep(2000);
+    assertEquals(4, fn.notifyFlush());
+
+    // Confirm the initial ones are done,
+    //  (it takes 2 calls to notify to release the waiting threads.)
+    for (int i = 0; i < 3; i++) {
+      assertTrue(tasks.get(i).get());
+    }
+    assertFalse(tasks.get(3).isDone());
+
+    // Now finish the fourth.
+    assertEquals(1, fn.notifyFlush());
+    assertTrue(tasks.get(3).get());
+    assertEquals(0, fn.notifyFlush());
+
+  }
+
+  // Have a thread wait until notified.
+  private Future<Boolean> waitFN(OzoneManagerDoubleBuffer.FlushNotifier fn,
+      ExecutorService executorService) {
+    return executorService.submit(() -> {
+      try {
+        fn.await();
+      } catch (InterruptedException e) {
+      }
+      return true;
+    });
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to