This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 759433c2ec HDDS-8499. Add mechanism to notify threads when OM double
buffer flushed. (#4621)
759433c2ec is described below
commit 759433c2ece2f48f15d0de3927c20f05ee32458c
Author: GeorgeJahad <[email protected]>
AuthorDate: Fri May 5 14:06:31 2023 -0700
HDDS-8499. Add mechanism to notify threads when OM double buffer flushed.
(#4621)
---
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 74 +++++++++-
.../ozone/om/ratis/OzoneManagerStateMachine.java | 8 ++
.../om/ratis/TestOzoneManagerDoubleBuffer.java | 150 +++++++++++++++++++++
3 files changed, 227 insertions(+), 5 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 94520e1a97..e2b47292bc 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -30,8 +30,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -104,6 +107,7 @@ public final class OzoneManagerDoubleBuffer {
private final boolean isRatisEnabled;
private final boolean isTracingEnabled;
private final Semaphore unFlushedTransactions;
+ private final FlushNotifier flushNotifier;
/**
* function which will get term associated with the transaction index.
@@ -120,6 +124,7 @@ public final class OzoneManagerDoubleBuffer {
private boolean isTracingEnabled = false;
private Function<Long, Long> indexToTerm = null;
private int maxUnFlushedTransactionCount = 0;
+ private FlushNotifier flushNotifier;
public Builder setOmMetadataManager(OMMetadataManager omm) {
this.mm = omm;
@@ -152,6 +157,11 @@ public final class OzoneManagerDoubleBuffer {
return this;
}
+ public Builder setFlushNotifier(FlushNotifier flushNotifier) {
+ this.flushNotifier = flushNotifier;
+ return this;
+ }
+
public OzoneManagerDoubleBuffer build() {
if (isRatisEnabled) {
Preconditions.checkNotNull(rs, "When ratis is enabled, " +
@@ -162,15 +172,21 @@ public final class OzoneManagerDoubleBuffer {
"when ratis is enable, maxUnFlushedTransactions " +
"should be bigger than 0");
}
+ if (flushNotifier == null) {
+ flushNotifier = new FlushNotifier();
+ }
+
return new OzoneManagerDoubleBuffer(mm, rs, isRatisEnabled,
- isTracingEnabled, indexToTerm, maxUnFlushedTransactionCount);
+ isTracingEnabled, indexToTerm, maxUnFlushedTransactionCount,
+ flushNotifier);
}
}
private OzoneManagerDoubleBuffer(OMMetadataManager omMetadataManager,
OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot,
boolean isRatisEnabled, boolean isTracingEnabled,
- Function<Long, Long> indexToTerm, int maxUnFlushedTransactions) {
+ Function<Long, Long> indexToTerm, int maxUnFlushedTransactions,
+ FlushNotifier flushNotifier) {
this.currentBuffer = new ConcurrentLinkedQueue<>();
this.readyBuffer = new ConcurrentLinkedQueue<>();
this.isRatisEnabled = isRatisEnabled;
@@ -185,6 +201,7 @@ public final class OzoneManagerDoubleBuffer {
this.ozoneManagerDoubleBufferMetrics =
OzoneManagerDoubleBufferMetrics.create();
this.indexToTerm = indexToTerm;
+ this.flushNotifier = flushNotifier;
isRunning.set(true);
// Daemon thread which runs in background and flushes transactions to DB.
@@ -254,7 +271,8 @@ public final class OzoneManagerDoubleBuffer {
* Runs in a background thread and batches the transaction in currentBuffer
* and commit to DB.
*/
- private void flushTransactions() {
+ @VisibleForTesting
+ void flushTransactions() {
while (isRunning.get() && canFlush()) {
flushCurrentBuffer();
}
@@ -295,6 +313,7 @@ public final class OzoneManagerDoubleBuffer {
}
clearReadyBuffer();
+ flushNotifier.notifyFlush();
} catch (IOException ex) {
terminate(ex, 1);
} catch (Throwable t) {
@@ -611,10 +630,14 @@ public final class OzoneManagerDoubleBuffer {
private synchronized boolean canFlush() {
try {
while (currentBuffer.size() == 0) {
- wait(Long.MAX_VALUE);
+ // canFlush() only gets called when the readyBuffer is empty.
+ // Since both buffers are empty, notify once for each.
+ flushNotifier.notifyFlush();
+ flushNotifier.notifyFlush();
+ wait(1000L);
}
return true;
- } catch (InterruptedException ex) {
+ } catch (InterruptedException ex) {
Thread.currentThread().interrupt();
if (isRunning.get()) {
final String message = "OMDoubleBuffer flush thread " +
@@ -649,4 +672,45 @@ public final class OzoneManagerDoubleBuffer {
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
return ozoneManagerDoubleBufferMetrics;
}
+
+ @VisibleForTesting
+ int getCurrentBufferSize() {
+ return currentBuffer.size();
+ }
+
+ @VisibleForTesting
+ int getReadyBufferSize() {
+ return readyBuffer.size();
+ }
+
+ @VisibleForTesting
+ void resume() {
+ isRunning.set(true);
+ }
+
+ void awaitFlush() throws InterruptedException {
+ flushNotifier.await();
+ }
+
+ static class FlushNotifier {
+ private final Set<CountDownLatch> flushLatches =
+ ConcurrentHashMap.newKeySet();
+
+ void await() throws InterruptedException {
+
+ // Wait until both the current and ready buffers are flushed.
+ CountDownLatch latch = new CountDownLatch(2);
+ flushLatches.add(latch);
+ latch.await();
+ flushLatches.remove(latch);
+ }
+
+ int notifyFlush() {
+ int retval = flushLatches.size();
+ for (CountDownLatch l : flushLatches) {
+ l.countDown();
+ }
+ return retval;
+ }
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 12343524c4..d3bf26b772 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -718,4 +718,12 @@ public class OzoneManagerStateMachine extends
BaseStateMachine {
return applyTransactionMap.get(transactionIndex);
}
+ /**
+ * Wait until both buffers are flushed. This is used in cases like
+ * "follower bootstrap tarball creation" where the rocksDb for the active
+ * fs needs to synchronized with the rocksdb's for the snapshots.
+ */
+ public void awaitDoubleBufferFlush() throws InterruptedException {
+ ozoneManagerDoubleBuffer.awaitFlush();
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
index 57dbf47e3e..939f5a210e 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerDoubleBuffer.java
@@ -18,8 +18,14 @@ package org.apache.hadoop.ozone.om.ratis;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.audit.AuditLogger;
@@ -38,16 +44,22 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateS
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
+import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
@@ -74,6 +86,8 @@ class TestOzoneManagerDoubleBuffer {
mock(OMSnapshotCreateResponse.class);
@TempDir
private File tempDir;
+ private OzoneManagerDoubleBuffer.FlushNotifier flushNotifier;
+ private OzoneManagerDoubleBuffer.FlushNotifier spyFlushNotifier;
@BeforeEach
public void setup() throws IOException {
@@ -93,12 +107,15 @@ class TestOzoneManagerDoubleBuffer {
OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot = index -> {
};
+ flushNotifier = new OzoneManagerDoubleBuffer.FlushNotifier();
+ spyFlushNotifier = spy(flushNotifier);
doubleBuffer = new OzoneManagerDoubleBuffer.Builder()
.setOmMetadataManager(omMetadataManager)
.setOzoneManagerRatisSnapShot(ozoneManagerRatisSnapshot)
.setmaxUnFlushedTransactionCount(1000)
.enableRatis(true)
.setIndexToTerm((i) -> 1L)
+ .setFlushNotifier(spyFlushNotifier)
.build();
doNothing().when(omKeyCreateResponse).checkAndUpdateDB(any(), any());
@@ -215,4 +232,137 @@ class TestOzoneManagerDoubleBuffer {
assertEquals(expectedAvgFlushTransactionsInMetric,
bufferMetrics.getAvgFlushTransactionsInOneIteration(), 0.001);
}
+
+ @Test
+ public void testAwaitFlush()
+ throws ExecutionException, InterruptedException {
+ List<OMClientResponse> omClientResponses =
+ Arrays.asList(omKeyCreateResponse,
+ omBucketCreateResponse);
+ int initialSize = omClientResponses.size();
+ AtomicInteger notifyCounter = new AtomicInteger();
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ int transactionIndex = 0;
+
+ // Stop the daemon to eliminate race conditions.
+ doubleBuffer.stopDaemon();
+
+ // Confirm clear.
+ assertEquals(0, doubleBuffer.getCurrentBufferSize());
+ assertEquals(0, doubleBuffer.getReadyBufferSize());
+
+ // Override notifier to do some assert checks.
+ doAnswer(i -> {
+ notifyCounter.incrementAndGet();
+ assertEquals(0, doubleBuffer.getCurrentBufferSize());
+ assertEquals(0, doubleBuffer.getReadyBufferSize());
+ flushNotifier.notifyFlush();
+ return null;
+ }).when(spyFlushNotifier).notifyFlush();
+
+ // Init double buffer.
+ for (OMClientResponse omClientResponse : omClientResponses) {
+ doubleBuffer.add(omClientResponse, transactionIndex++);
+ }
+ assertEquals(initialSize,
+ doubleBuffer.getCurrentBufferSize());
+
+ // Start double buffer and wait for flush.
+ Future<?> await = awaitFlush(executorService);
+ Future<Boolean> flusher = flushTransactions(executorService);
+ await.get();
+
+ // Make sure notify was called at least twice.
+ assertTrue(notifyCounter.get() >= 2);
+ assertFalse(flusher.isDone());
+
+ // Confirm still empty.
+ assertEquals(0, doubleBuffer.getCurrentBufferSize());
+ assertEquals(0, doubleBuffer.getReadyBufferSize());
+
+ // Run again to make sure it works when double buffer is empty
+ await = awaitFlush(executorService);
+ await.get();
+
+ // Clean up.
+ flusher.cancel(false);
+ assertThrows(java.util.concurrent.CancellationException.class,
+ flusher::get);
+ }
+
+
+ // Return a future that waits for the flush.
+ private Future<Boolean> awaitFlush(ExecutorService executorService) {
+ return executorService.submit(() -> {
+ doubleBuffer.awaitFlush();
+ return true;
+ });
+ }
+
+ private Future<Boolean> flushTransactions(ExecutorService executorService) {
+ return executorService.submit(() -> {
+ doubleBuffer.resume();
+ try {
+ doubleBuffer.flushTransactions();
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ });
+ }
+
+ @Test
+ public void testFlushNotifier()
+ throws InterruptedException, ExecutionException {
+
+ OzoneManagerDoubleBuffer.FlushNotifier fn =
+ new OzoneManagerDoubleBuffer.FlushNotifier();
+
+ // Confirm nothing waiting yet.
+ assertEquals(0, fn.notifyFlush());
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ List<Future<Boolean>> tasks = new ArrayList<>();
+
+ // Simulate 3 waiting.
+ for (int i = 0; i < 3; i++) {
+ tasks.add(waitFN(fn, executorService));
+ }
+ Thread.sleep(2000);
+
+ // Confirm not done.
+ for (Future<Boolean> task : tasks) {
+ assertFalse(task.isDone());
+ }
+ assertEquals(3, fn.notifyFlush());
+
+ // Add a fourth.
+ tasks.add(waitFN(fn, executorService));
+ Thread.sleep(2000);
+ assertEquals(4, fn.notifyFlush());
+
+ // Confirm the initial ones are done,
+ // (it takes 2 calls to notify to release the waiting threads.)
+ for (int i = 0; i < 3; i++) {
+ assertTrue(tasks.get(i).get());
+ }
+ assertFalse(tasks.get(3).isDone());
+
+ // Now finish the fourth.
+ assertEquals(1, fn.notifyFlush());
+ assertTrue(tasks.get(3).get());
+ assertEquals(0, fn.notifyFlush());
+
+ }
+
+ // Have a thread wait until notified.
+ private Future<Boolean> waitFN(OzoneManagerDoubleBuffer.FlushNotifier fn,
+ ExecutorService executorService) {
+ return executorService.submit(() -> {
+ try {
+ fn.await();
+ } catch (InterruptedException e) {
+ }
+ return true;
+ });
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]