hemantk-12 commented on code in PR #4621:
URL: https://github.com/apache/ozone/pull/4621#discussion_r1181942355
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,4 +673,46 @@ private synchronized void swapCurrentAndReadyBuffer() {
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
return ozoneManagerDoubleBufferMetrics;
}
+
+ @VisibleForTesting
+ int getCurrentBufferSize() {
+ return currentBuffer.size();
+ }
+
+ @VisibleForTesting
+ int getReadyBufferSize() {
+ return readyBuffer.size();
+ }
+
+ @VisibleForTesting
+ void resume() {
+ isRunning.set(true);
+ }
+
+ void awaitFlush() throws InterruptedException {
+ flushNotifier.await();
+ }
+
+ static class FlushNotifier {
+ private final ConcurrentHashMap<CountDownLatch, Object> flushLatches =
Review Comment:
I had an assumption that `ConcurrentSet` is there but unfortunately it is
not.
You can still create concurrent set like `ConcurrentHashMap.newKeySet()`.
https://www.baeldung.com/java-concurrent-hashset-concurrenthashmap#thread-safe-hashset-using-concurrenthashmap-factory-method
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -611,7 +630,12 @@ public synchronized CompletableFuture<Void>
add(OMClientResponse response,
private synchronized boolean canFlush() {
try {
while (currentBuffer.size() == 0) {
- wait(Long.MAX_VALUE);
+ wait(1000L);
+ if (currentBuffer.size() == 0) {
+ // Both buffers are empty, so notify twice
+ flushNotifier.notifyFlush();
+ flushNotifier.notifyFlush();
Review Comment:
My bad, you are right it.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,4 +673,46 @@ private synchronized void swapCurrentAndReadyBuffer() {
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
return ozoneManagerDoubleBufferMetrics;
}
+
+ @VisibleForTesting
+ int getCurrentBufferSize() {
+ return currentBuffer.size();
+ }
+
+ @VisibleForTesting
+ int getReadyBufferSize() {
+ return readyBuffer.size();
+ }
+
+ @VisibleForTesting
+ void resume() {
+ isRunning.set(true);
+ }
+
+ void awaitFlush() throws InterruptedException {
+ flushNotifier.await();
+ }
+
+ static class FlushNotifier {
+ private final ConcurrentHashMap<CountDownLatch, Object> flushLatches =
+ new ConcurrentHashMap<>();
+
+ void await() throws InterruptedException {
+
+ // Wait until both the current and ready buffers are flushed.
+ CountDownLatch latch = new CountDownLatch(2);
Review Comment:
Yes, but you are notifying it together in line 636-637. How does it make any
difference to use latch of one or two?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]