smengcl commented on code in PR #3958:
URL: https://github.com/apache/ozone/pull/3958#discussion_r1028677858


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -263,138 +255,204 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+      throws IOException {
+
+    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+    List<Long> flushedEpochs;
+
+    try (BatchOperation batchOperation = omMetadataManager.getStore()
+        .initBatchOperation()) {
+
+      String lastTraceId = addToBatch(buffer, batchOperation);
+
+      buffer.iterator().forEachRemaining(
+          entry -> addCleanupEntry(entry, cleanupEpochs));
+
+      // Commit transaction info to DB.
+      flushedEpochs = buffer.stream()
+          .map(DoubleBufferEntry::getTrxLogIndex)
+          .sorted()
+          .collect(Collectors.toList());
+
+      long lastRatisTransactionIndex = flushedEpochs.get(
+          flushedEpochs.size() - 1);
+
+      long term = isRatisEnabled ?
+          indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+      addToBatchTransactionInfoWithTrace(lastTraceId,
+          lastRatisTransactionIndex,
+          () -> {
+            omMetadataManager.getTransactionInfoTable().putWithBatch(
+                batchOperation, TRANSACTION_INFO_KEY,
+                new TransactionInfo.Builder()
+                    .setTransactionIndex(lastRatisTransactionIndex)
+                    .setCurrentTerm(term)
+                    .build());
+            return null;
+          });
+
+      long startTime = Time.monotonicNow();
+      flushBatchWithTrace(lastTraceId, buffer.size(),
+          () -> {
+            omMetadataManager.getStore()
+                .commitBatchOperation(batchOperation);
+            return null;
+          });
+
+      ozoneManagerDoubleBufferMetrics.updateFlushTime(
+          Time.monotonicNow() - startTime);
+    }
 
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
-
-          // set metrics.
-          updateMetrics(flushedTransactionsSize);
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        if (isRunning.get()) {
-          final String message = "OMDoubleBuffer flush thread " +
-              Thread.currentThread().getName() + " encountered Interrupted " +
-              "exception while running";
-          ExitUtils.terminate(1, message, ex, LOG);
-        } else {
-          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit.", Thread.currentThread().getName());
-        }
+    // Complete futures first and then do other things.
+    // So that handler threads will be released.
+    if (!isRatisEnabled) {
+      clearReadyFutureQueue(buffer.size());
+    }
+
+    int flushedTransactionsSize = buffer.size();
+    flushedTransactionCount.addAndGet(flushedTransactionsSize);
+    flushIterations.incrementAndGet();
+
+    LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+        flushIterations.get(),
+        flushedTransactionsSize);
+
+    // Clean up committed transactions.
+    cleanupCache(cleanupEpochs);
+
+    if (isRatisEnabled) {
+      releaseUnFlushedTransactions(flushedTransactionsSize);
+    }
+    // update the last updated index in OzoneManagerStateMachine.
+    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+
+    // set metrics.
+    updateMetrics(flushedTransactionsSize);
+  }
+
+  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
+                            BatchOperation batchOperation) {
+    String lastTraceId = null;
+
+    Iterator<DoubleBufferEntry<OMClientResponse>> iterator = buffer.iterator();
+    while (iterator.hasNext()) {
+      DoubleBufferEntry<OMClientResponse> entry = iterator.next();
+      OMClientResponse response = entry.getResponse();
+      OMResponse omResponse = response.getOMResponse();
+      lastTraceId = omResponse.getTraceID();
+
+      try {
+        addToBatchWithTrace(omResponse,
+            () -> {
+              response.checkAndUpdateDB(omMetadataManager, batchOperation);
+              return null;
+            });
       } catch (IOException ex) {
+        // During Adding to RocksDB batch entry got an exception.
+        // We should terminate the OM.
         terminate(ex);
-      } catch (Throwable t) {
-        final String s = "OMDoubleBuffer flush thread " +
-            Thread.currentThread().getName() + " encountered Throwable error";
-        ExitUtils.terminate(2, s, t, LOG);
       }
     }
+
+    return lastTraceId;
   }
 
+  /**
+   * Splits the readyBuffer around the create snapshot request.
+   * Returns, the list of queue split by create snapshot requests.
+   *
+   * CreateSnapshot is used as barrier because the checkpoint creation happens
+   * in RocksDB callback flush. If multiple operations are flushed in one
+   * specific batch, we are not sure at the flush of which specific operation
+   * the callback is coming.
+   * There could be a possibility of race condition that is exposed to rocksDB
+   * behaviour for the batch.
+   * Hence, we treat createSnapshot as separate batch flush.
+   *
+   * e.g. requestBuffer = [request1, request2, snapshotRequest1,
+   * request3, snapshotRequest2, request4]
+   * response = [[request1, request2], [snapshotRequest1], [request3],
+   * [snapshotRequest2], [request4]]
+   */
+  private List<Queue<DoubleBufferEntry<OMClientResponse>>>
+      splitReadyBufferAtCreateSnapshot() {
+    List<Queue<DoubleBufferEntry<OMClientResponse>>> response =
+        new ArrayList<>();
+
+    Iterator<DoubleBufferEntry<OMClientResponse>> iterator =
+        readyBuffer.iterator();
+
+    OMResponse previousOmResponse = null;
+    while (iterator.hasNext()) {
+      DoubleBufferEntry<OMClientResponse> entry = iterator.next();
+      OMResponse omResponse = entry.getResponse().getOMResponse();
+      // New queue gets created in three conditions:
+      // 1. It is first element in the response,
+      // 2. Current request is createSnapshot request.
+      // 3. Previous request was createSnapshot request.
+      if (response.isEmpty() ||
+          omResponse.getCreateSnapshotResponse() != null ||
+          (previousOmResponse != null &&
+              previousOmResponse.getCreateSnapshotResponse() != null)) {

Review Comment:
   The logic sounds good to me. But IntelliJ somehow gives scary "condition 
always true" warning and suggests removal of the condition. I think this is a 
false alarm. Would you also double check?
   
   <img width="926" alt="image" 
src="https://user-images.githubusercontent.com/50227127/203196012-014ae702-a40b-4e69-9a0f-0a7b30d19860.png";>



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -263,138 +255,204 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)

Review Comment:
   nit: missing newline
   ```suggestion
     }
   
     private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -417,14 +475,28 @@ private void addCleanupEntry(DoubleBufferEntry entry, 
Map<String,
             .add(entry.getTrxLogIndex());
       }
     } else {
-      // This is to catch early errors, when an new response class missed to
+      // This is to catch early errors, when a new response class missed to
       // add CleanupTableInfo annotation.
       throw new RuntimeException("CleanupTableInfo Annotation is missing " +
           "for" + responseClass);
     }
   }
 
+  /**
+   * Completes futures for first count element form the readyFutureQueue
+   * so that handler thread can be released asap.
+   */
+  private void clearReadyFutureQueue(int count) {
+    if (isRatisEnabled) {
+      return;
+    }
 
+    Iterator<CompletableFuture<Void>> iterator = readyFutureQueue.iterator();
+    while (iterator.hasNext() && count > 0) {
+      iterator.next().complete(null);
+      count--;
+    }
+  }

Review Comment:
   Is this method missing the clear queue call here?
   
   Current `master` branch code for reference, line 336:
   
   
https://github.com/apache/ozone/blob/c55c2dc4745c45404f82f30a55dc52911da693d1/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java#L332-L336



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -417,14 +475,28 @@ private void addCleanupEntry(DoubleBufferEntry entry, 
Map<String,
             .add(entry.getTrxLogIndex());
       }
     } else {
-      // This is to catch early errors, when an new response class missed to
+      // This is to catch early errors, when a new response class missed to
       // add CleanupTableInfo annotation.
       throw new RuntimeException("CleanupTableInfo Annotation is missing " +
           "for" + responseClass);
     }
   }
 
+  /**
+   * Completes futures for first count element form the readyFutureQueue
+   * so that handler thread can be released asap.
+   */
+  private void clearReadyFutureQueue(int count) {
+    if (isRatisEnabled) {
+      return;
+    }

Review Comment:
   Looks like the only caller is already checking `isRatisEnabled`:
   
   
https://github.com/apache/ozone/pull/3958/files#diff-15eb45eac3d9d3b7f34caf80bb6eb63be58f004e936626a4c3771ad0fc5f7f4eR356-R360
   
   Is there a reason we want to check again?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -263,138 +255,204 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+      throws IOException {
+
+    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+    List<Long> flushedEpochs;
+
+    try (BatchOperation batchOperation = omMetadataManager.getStore()
+        .initBatchOperation()) {
+
+      String lastTraceId = addToBatch(buffer, batchOperation);
+
+      buffer.iterator().forEachRemaining(
+          entry -> addCleanupEntry(entry, cleanupEpochs));
+
+      // Commit transaction info to DB.
+      flushedEpochs = buffer.stream()
+          .map(DoubleBufferEntry::getTrxLogIndex)
+          .sorted()
+          .collect(Collectors.toList());
+
+      long lastRatisTransactionIndex = flushedEpochs.get(
+          flushedEpochs.size() - 1);
+
+      long term = isRatisEnabled ?
+          indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+      addToBatchTransactionInfoWithTrace(lastTraceId,
+          lastRatisTransactionIndex,
+          () -> {
+            omMetadataManager.getTransactionInfoTable().putWithBatch(
+                batchOperation, TRANSACTION_INFO_KEY,
+                new TransactionInfo.Builder()
+                    .setTransactionIndex(lastRatisTransactionIndex)
+                    .setCurrentTerm(term)
+                    .build());
+            return null;
+          });
+
+      long startTime = Time.monotonicNow();
+      flushBatchWithTrace(lastTraceId, buffer.size(),
+          () -> {
+            omMetadataManager.getStore()
+                .commitBatchOperation(batchOperation);
+            return null;
+          });
+
+      ozoneManagerDoubleBufferMetrics.updateFlushTime(
+          Time.monotonicNow() - startTime);
+    }
 
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
-
-          // set metrics.
-          updateMetrics(flushedTransactionsSize);
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        if (isRunning.get()) {
-          final String message = "OMDoubleBuffer flush thread " +
-              Thread.currentThread().getName() + " encountered Interrupted " +
-              "exception while running";
-          ExitUtils.terminate(1, message, ex, LOG);
-        } else {
-          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit.", Thread.currentThread().getName());
-        }
+    // Complete futures first and then do other things.
+    // So that handler threads will be released.
+    if (!isRatisEnabled) {
+      clearReadyFutureQueue(buffer.size());
+    }
+
+    int flushedTransactionsSize = buffer.size();
+    flushedTransactionCount.addAndGet(flushedTransactionsSize);
+    flushIterations.incrementAndGet();
+
+    LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+        flushIterations.get(),
+        flushedTransactionsSize);
+
+    // Clean up committed transactions.
+    cleanupCache(cleanupEpochs);
+
+    if (isRatisEnabled) {
+      releaseUnFlushedTransactions(flushedTransactionsSize);
+    }
+    // update the last updated index in OzoneManagerStateMachine.
+    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+
+    // set metrics.
+    updateMetrics(flushedTransactionsSize);
+  }
+
+  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
+                            BatchOperation batchOperation) {
+    String lastTraceId = null;

Review Comment:
   ~~Current code uses `AtomicReference<String>` for some reason?~~
   
   Found the original comment. Looks like we can safely use non-atomic here: 
https://github.com/apache/ozone/pull/691/files#r412625536



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -263,138 +255,204 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+      throws IOException {
+
+    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+    List<Long> flushedEpochs;
+
+    try (BatchOperation batchOperation = omMetadataManager.getStore()
+        .initBatchOperation()) {
+
+      String lastTraceId = addToBatch(buffer, batchOperation);
+
+      buffer.iterator().forEachRemaining(
+          entry -> addCleanupEntry(entry, cleanupEpochs));
+
+      // Commit transaction info to DB.
+      flushedEpochs = buffer.stream()
+          .map(DoubleBufferEntry::getTrxLogIndex)
+          .sorted()
+          .collect(Collectors.toList());
+
+      long lastRatisTransactionIndex = flushedEpochs.get(
+          flushedEpochs.size() - 1);
+
+      long term = isRatisEnabled ?
+          indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+      addToBatchTransactionInfoWithTrace(lastTraceId,
+          lastRatisTransactionIndex,
+          () -> {
+            omMetadataManager.getTransactionInfoTable().putWithBatch(
+                batchOperation, TRANSACTION_INFO_KEY,
+                new TransactionInfo.Builder()
+                    .setTransactionIndex(lastRatisTransactionIndex)
+                    .setCurrentTerm(term)
+                    .build());
+            return null;
+          });
+
+      long startTime = Time.monotonicNow();
+      flushBatchWithTrace(lastTraceId, buffer.size(),
+          () -> {
+            omMetadataManager.getStore()
+                .commitBatchOperation(batchOperation);
+            return null;
+          });
+
+      ozoneManagerDoubleBufferMetrics.updateFlushTime(
+          Time.monotonicNow() - startTime);
+    }
 
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
-
-          // set metrics.
-          updateMetrics(flushedTransactionsSize);
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        if (isRunning.get()) {
-          final String message = "OMDoubleBuffer flush thread " +
-              Thread.currentThread().getName() + " encountered Interrupted " +
-              "exception while running";
-          ExitUtils.terminate(1, message, ex, LOG);
-        } else {
-          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit.", Thread.currentThread().getName());
-        }
+    // Complete futures first and then do other things.
+    // So that handler threads will be released.
+    if (!isRatisEnabled) {
+      clearReadyFutureQueue(buffer.size());
+    }
+
+    int flushedTransactionsSize = buffer.size();
+    flushedTransactionCount.addAndGet(flushedTransactionsSize);
+    flushIterations.incrementAndGet();
+
+    LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+        flushIterations.get(),
+        flushedTransactionsSize);
+
+    // Clean up committed transactions.
+    cleanupCache(cleanupEpochs);
+
+    if (isRatisEnabled) {
+      releaseUnFlushedTransactions(flushedTransactionsSize);
+    }
+    // update the last updated index in OzoneManagerStateMachine.
+    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+
+    // set metrics.
+    updateMetrics(flushedTransactionsSize);
+  }
+
+  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
+                            BatchOperation batchOperation) {
+    String lastTraceId = null;
+
+    Iterator<DoubleBufferEntry<OMClientResponse>> iterator = buffer.iterator();
+    while (iterator.hasNext()) {
+      DoubleBufferEntry<OMClientResponse> entry = iterator.next();

Review Comment:
   this might be cleaner?
   
   ```suggestion
       for (DoubleBufferEntry<OMClientResponse> entry : buffer) {
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -263,138 +255,204 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+      throws IOException {
+
+    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+    List<Long> flushedEpochs;
+
+    try (BatchOperation batchOperation = omMetadataManager.getStore()
+        .initBatchOperation()) {
+
+      String lastTraceId = addToBatch(buffer, batchOperation);
+
+      buffer.iterator().forEachRemaining(
+          entry -> addCleanupEntry(entry, cleanupEpochs));
+
+      // Commit transaction info to DB.
+      flushedEpochs = buffer.stream()
+          .map(DoubleBufferEntry::getTrxLogIndex)
+          .sorted()
+          .collect(Collectors.toList());
+
+      long lastRatisTransactionIndex = flushedEpochs.get(
+          flushedEpochs.size() - 1);
+
+      long term = isRatisEnabled ?
+          indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+      addToBatchTransactionInfoWithTrace(lastTraceId,
+          lastRatisTransactionIndex,
+          () -> {
+            omMetadataManager.getTransactionInfoTable().putWithBatch(
+                batchOperation, TRANSACTION_INFO_KEY,
+                new TransactionInfo.Builder()
+                    .setTransactionIndex(lastRatisTransactionIndex)
+                    .setCurrentTerm(term)
+                    .build());
+            return null;
+          });
+
+      long startTime = Time.monotonicNow();
+      flushBatchWithTrace(lastTraceId, buffer.size(),
+          () -> {
+            omMetadataManager.getStore()
+                .commitBatchOperation(batchOperation);
+            return null;
+          });
+
+      ozoneManagerDoubleBufferMetrics.updateFlushTime(
+          Time.monotonicNow() - startTime);
+    }
 
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
-
-          // set metrics.
-          updateMetrics(flushedTransactionsSize);
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        if (isRunning.get()) {
-          final String message = "OMDoubleBuffer flush thread " +
-              Thread.currentThread().getName() + " encountered Interrupted " +
-              "exception while running";
-          ExitUtils.terminate(1, message, ex, LOG);
-        } else {
-          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit.", Thread.currentThread().getName());
-        }
+    // Complete futures first and then do other things.
+    // So that handler threads will be released.
+    if (!isRatisEnabled) {
+      clearReadyFutureQueue(buffer.size());
+    }
+
+    int flushedTransactionsSize = buffer.size();
+    flushedTransactionCount.addAndGet(flushedTransactionsSize);
+    flushIterations.incrementAndGet();
+
+    LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+        flushIterations.get(),
+        flushedTransactionsSize);

Review Comment:
   I'd suggest putting the `if (LOG.isDebugEnabled())` check back in this case:
   
   
https://github.com/apache/ozone/blob/c55c2dc4745c45404f82f30a55dc52911da693d1/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java#L343-L347
   
   Those extra atomic get `flushIterations.get()` could be expensive. Not sure 
how much perf impact it could actually cause though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to