This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 6169dc7e9a HDDS-10022. Remove current/readyFutureQueue from 
OzoneManagerDoubleBuffer. (#5881)
6169dc7e9a is described below

commit 6169dc7e9a2a786ec5bddf857667cd71da89618a
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sat Dec 30 08:41:22 2023 -0800

    HDDS-10022. Remove current/readyFutureQueue from OzoneManagerDoubleBuffer. 
(#5881)
---
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   | 46 +++-------------------
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  3 +-
 .../hadoop/ozone/om/response/OMClientResponse.java | 24 +++--------
 .../protocolPB/OzoneManagerRequestHandler.java     |  3 +-
 4 files changed, 14 insertions(+), 62 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index f20b5dad15..1b4e3cafc4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -104,15 +104,6 @@ public final class OzoneManagerDoubleBuffer {
   private Queue<Entry> currentBuffer;
   private Queue<Entry> readyBuffer;
 
-
-  // future objects which hold the future returned by add method.
-  private volatile Queue<CompletableFuture<Void>> currentFutureQueue;
-
-  // Once we have an entry in current buffer, we swap the currentFutureQueue
-  // with readyFutureQueue. After flush is completed in flushTransaction
-  // daemon thread, we complete the futures in readyFutureQueue and clear them.
-  private volatile Queue<CompletableFuture<Void>> readyFutureQueue;
-
   private final Daemon daemon;
   private final OMMetadataManager omMetadataManager;
   private final AtomicLong flushedTransactionCount = new AtomicLong(0);
@@ -213,10 +204,6 @@ public final class OzoneManagerDoubleBuffer {
     this.readyBuffer = new ConcurrentLinkedQueue<>();
     this.isRatisEnabled = isRatisEnabled;
     this.isTracingEnabled = isTracingEnabled;
-    if (!isRatisEnabled) {
-      this.currentFutureQueue = new ConcurrentLinkedQueue<>();
-      this.readyFutureQueue = new ConcurrentLinkedQueue<>();
-    }
     this.unFlushedTransactions = new Semaphore(maxUnFlushedTransactions);
     this.omMetadataManager = omMetadataManager;
     this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapShot;
@@ -380,7 +367,10 @@ public final class OzoneManagerDoubleBuffer {
     // Complete futures first and then do other things.
     // So that handler threads will be released.
     if (!isRatisEnabled) {
-      clearReadyFutureQueue(buffer.size());
+      buffer.stream()
+          .map(Entry::getResponse)
+          .map(OMClientResponse::getFlushFuture)
+          .forEach(f -> f.complete(null));
     }
 
     int flushedTransactionsSize = buffer.size();
@@ -495,17 +485,6 @@ public final class OzoneManagerDoubleBuffer {
     }
   }
 
-  /**
-   * Completes futures for first count element form the readyFutureQueue
-   * so that handler thread can be released asap.
-   */
-  private void clearReadyFutureQueue(int count) {
-    while (!readyFutureQueue.isEmpty() && count > 0) {
-      readyFutureQueue.remove().complete(null);
-      count--;
-    }
-  }
-
   private void cleanupCache(Map<String, List<Long>> cleanupEpochs) {
     cleanupEpochs.forEach((tableName, epochs) -> {
       Collections.sort(epochs);
@@ -600,18 +579,12 @@ public final class OzoneManagerDoubleBuffer {
   /**
    * Add OmResponseBufferEntry to buffer.
    */
-  public synchronized CompletableFuture<Void> add(OMClientResponse response, 
TermIndex termIndex) {
+  public synchronized void add(OMClientResponse response, TermIndex termIndex) 
{
     currentBuffer.add(new Entry(termIndex, response));
     notify();
 
     if (!isRatisEnabled) {
-      CompletableFuture<Void> future = new CompletableFuture<>();
-      currentFutureQueue.add(future);
-      return future;
-    } else {
-      // In Non-HA case we don't need future to be returned, and this return
-      // status is not used.
-      return null;
+      response.setFlushFuture(new CompletableFuture<>());
     }
   }
 
@@ -654,13 +627,6 @@ public final class OzoneManagerDoubleBuffer {
     final Queue<Entry> temp = currentBuffer;
     currentBuffer = readyBuffer;
     readyBuffer = temp;
-
-    if (!isRatisEnabled) {
-      // Swap future queue.
-      Queue<CompletableFuture<Void>> tempFuture = currentFutureQueue;
-      currentFutureQueue = readyFutureQueue;
-      readyFutureQueue = tempFuture;
-    }
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index ee01ecedb8..a25261ded9 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -610,8 +610,7 @@ public class OzoneManagerStateMachine extends 
BaseStateMachine {
     }
     OMResponse omResponse = omResponseBuilder.build();
     OMClientResponse omClientResponse = new DummyOMClientResponse(omResponse);
-    omClientResponse.setFlushFuture(
-        ozoneManagerDoubleBuffer.add(omClientResponse, termIndex));
+    ozoneManagerDoubleBuffer.add(omClientResponse, termIndex);
     return omResponse;
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java
index 2a8af15b6b..6e300f2e03 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/OMClientResponse.java
@@ -25,25 +25,20 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.lock.OMLockDetails;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Interface for OM Responses, each OM response should implement this 
interface.
  */
 public abstract class OMClientResponse {
 
-  private OMResponse omResponse;
+  private final OMResponse omResponse;
+  /** Used only for non-Ratis. */
   private CompletableFuture<Void> flushFuture = null;
   private OMLockDetails omLockDetails;
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(OMClientResponse.class);
-
   public OMClientResponse(OMResponse omResponse) {
     Preconditions.checkNotNull(omResponse);
     this.omResponse = omResponse;
@@ -57,21 +52,17 @@ public abstract class OMClientResponse {
    * For error case, check that the status of omResponse is not OK.
    */
   public void checkStatusNotOK() {
-    Preconditions.checkArgument(!omResponse.getStatus().equals(
-        OzoneManagerProtocolProtos.Status.OK));
+    Preconditions.checkArgument(!omResponse.getStatus().equals(Status.OK));
   }
 
   /**
    * Check if omResponse status is OK. If yes, add to DB.
    * For OmResponse with failure, this should do nothing. This method is not
    * called in failure scenario in OM code.
-   * @param omMetadataManager
-   * @param batchOperation
-   * @throws IOException
    */
   public void checkAndUpdateDB(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
-    if (omResponse.getStatus() == OzoneManagerProtocolProtos.Status.OK) {
+    if (omResponse.getStatus() == Status.OK) {
       addToDBBatch(omMetadataManager, batchOperation);
     }
   }
@@ -79,9 +70,6 @@ public abstract class OMClientResponse {
   /**
    * Implement logic to add the response to batch. This function should be
    * called from checkAndUpdateDB only.
-   * @param omMetadataManager
-   * @param batchOperation
-   * @throws IOException
    */
   protected abstract void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 818f61c19a..d96637b461 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -404,8 +404,7 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
           Preconditions.checkNotNull(omClientResponse,
               "omClientResponse returned by validateAndUpdateCache cannot be 
null");
           if (omRequest.getCmdType() != Type.Prepare) {
-            omClientResponse.setFlushFuture(
-                ozoneManagerDoubleBuffer.add(omClientResponse, termIndex));
+            ozoneManagerDoubleBuffer.add(omClientResponse, termIndex);
           }
           return omClientResponse;
         });


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to