hemantk-12 commented on code in PR #4621:
URL: https://github.com/apache/ozone/pull/4621#discussion_r1181846984
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,4 +673,46 @@ private synchronized void swapCurrentAndReadyBuffer() {
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
return ozoneManagerDoubleBufferMetrics;
}
+
+ @VisibleForTesting
+ int getCurrentBufferSize() {
+ return currentBuffer.size();
+ }
+
+ @VisibleForTesting
+ int getReadyBufferSize() {
+ return readyBuffer.size();
+ }
+
+ @VisibleForTesting
+ void resume() {
+ isRunning.set(true);
+ }
+
+ void awaitFlush() throws InterruptedException {
+ flushNotifier.await();
+ }
+
+ static class FlushNotifier {
+ private final ConcurrentHashMap<CountDownLatch, Object> flushLatches =
+ new ConcurrentHashMap<>();
+
+ void await() throws InterruptedException {
+
+ // Wait until both the current and ready buffers are flushed.
+ CountDownLatch latch = new CountDownLatch(2);
+ flushLatches.put(latch, latch);
+ latch.await();
+ flushLatches.remove(latch);
+ }
+
+ int notifyFlush() {
+ Set<CountDownLatch> latches = flushLatches.keySet();
+ int retval = latches.size();
+ for (CountDownLatch l: flushLatches.keySet()) {
Review Comment:
```suggestion
for (CountDownLatch l: latches) {
l.countDown();
}
```
to avoid any update in between 710-712.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -611,7 +630,12 @@ public synchronized CompletableFuture<Void>
add(OMClientResponse response,
private synchronized boolean canFlush() {
try {
while (currentBuffer.size() == 0) {
- wait(Long.MAX_VALUE);
+ wait(1000L);
+ if (currentBuffer.size() == 0) {
+ // Both buffers are empty, so notify twice
+ flushNotifier.notifyFlush();
+ flushNotifier.notifyFlush();
Review Comment:
Should we return `false` in this scenario?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,4 +673,46 @@ private synchronized void swapCurrentAndReadyBuffer() {
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
return ozoneManagerDoubleBufferMetrics;
}
+
+ @VisibleForTesting
+ int getCurrentBufferSize() {
+ return currentBuffer.size();
+ }
+
+ @VisibleForTesting
+ int getReadyBufferSize() {
+ return readyBuffer.size();
+ }
+
+ @VisibleForTesting
+ void resume() {
+ isRunning.set(true);
+ }
+
+ void awaitFlush() throws InterruptedException {
+ flushNotifier.await();
+ }
+
+ static class FlushNotifier {
+ private final ConcurrentHashMap<CountDownLatch, Object> flushLatches =
Review Comment:
```suggestion
private final ConcurrentMap<CountDownLatch, Object> flushLatches =
new ConcurrentHashMap<>();
```
Since we are not using value from Map, can this be changed to
`ConcurrentSet`?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -611,7 +630,12 @@ public synchronized CompletableFuture<Void>
add(OMClientResponse response,
private synchronized boolean canFlush() {
try {
while (currentBuffer.size() == 0) {
- wait(Long.MAX_VALUE);
+ wait(1000L);
Review Comment:
Was this intentionally changed to 1000L?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java:
##########
@@ -718,4 +718,7 @@ public long getTermForIndex(long transactionIndex) {
return applyTransactionMap.get(transactionIndex);
}
+ public void awaitDoubleBufferFlush() throws InterruptedException {
Review Comment:
Is it supposed to be used by Deletion service?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -611,7 +630,12 @@ public synchronized CompletableFuture<Void>
add(OMClientResponse response,
private synchronized boolean canFlush() {
try {
while (currentBuffer.size() == 0) {
- wait(Long.MAX_VALUE);
+ wait(1000L);
+ if (currentBuffer.size() == 0) {
Review Comment:
Is it supposed to be `if (readyBuffer.size() == 0)`?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -649,4 +673,46 @@ private synchronized void swapCurrentAndReadyBuffer() {
public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
return ozoneManagerDoubleBufferMetrics;
}
+
+ @VisibleForTesting
+ int getCurrentBufferSize() {
+ return currentBuffer.size();
+ }
+
+ @VisibleForTesting
+ int getReadyBufferSize() {
+ return readyBuffer.size();
+ }
+
+ @VisibleForTesting
+ void resume() {
+ isRunning.set(true);
+ }
+
+ void awaitFlush() throws InterruptedException {
+ flushNotifier.await();
+ }
+
+ static class FlushNotifier {
+ private final ConcurrentHashMap<CountDownLatch, Object> flushLatches =
+ new ConcurrentHashMap<>();
+
+ void await() throws InterruptedException {
+
+ // Wait until both the current and ready buffers are flushed.
+ CountDownLatch latch = new CountDownLatch(2);
Review Comment:
I'm not getting the point of using CountDownLatch of size 2 since we notify
them together. Is it just to be semantically correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]