hemantk-12 commented on code in PR #3958:
URL: https://github.com/apache/ozone/pull/3958#discussion_r1028859557


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -263,138 +255,204 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+      throws IOException {
+
+    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+    List<Long> flushedEpochs;
+
+    try (BatchOperation batchOperation = omMetadataManager.getStore()
+        .initBatchOperation()) {
+
+      String lastTraceId = addToBatch(buffer, batchOperation);
+
+      buffer.iterator().forEachRemaining(
+          entry -> addCleanupEntry(entry, cleanupEpochs));
+
+      // Commit transaction info to DB.
+      flushedEpochs = buffer.stream()
+          .map(DoubleBufferEntry::getTrxLogIndex)
+          .sorted()
+          .collect(Collectors.toList());
+
+      long lastRatisTransactionIndex = flushedEpochs.get(
+          flushedEpochs.size() - 1);
+
+      long term = isRatisEnabled ?
+          indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+      addToBatchTransactionInfoWithTrace(lastTraceId,
+          lastRatisTransactionIndex,
+          () -> {
+            omMetadataManager.getTransactionInfoTable().putWithBatch(
+                batchOperation, TRANSACTION_INFO_KEY,
+                new TransactionInfo.Builder()
+                    .setTransactionIndex(lastRatisTransactionIndex)
+                    .setCurrentTerm(term)
+                    .build());
+            return null;
+          });
+
+      long startTime = Time.monotonicNow();
+      flushBatchWithTrace(lastTraceId, buffer.size(),
+          () -> {
+            omMetadataManager.getStore()
+                .commitBatchOperation(batchOperation);
+            return null;
+          });
+
+      ozoneManagerDoubleBufferMetrics.updateFlushTime(
+          Time.monotonicNow() - startTime);
+    }
 
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
-
-          // set metrics.
-          updateMetrics(flushedTransactionsSize);
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        if (isRunning.get()) {
-          final String message = "OMDoubleBuffer flush thread " +
-              Thread.currentThread().getName() + " encountered Interrupted " +
-              "exception while running";
-          ExitUtils.terminate(1, message, ex, LOG);
-        } else {
-          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit.", Thread.currentThread().getName());
-        }
+    // Complete futures first and then do other things.
+    // So that handler threads will be released.
+    if (!isRatisEnabled) {
+      clearReadyFutureQueue(buffer.size());
+    }
+
+    int flushedTransactionsSize = buffer.size();
+    flushedTransactionCount.addAndGet(flushedTransactionsSize);
+    flushIterations.incrementAndGet();
+
+    LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+        flushIterations.get(),
+        flushedTransactionsSize);
+
+    // Clean up committed transactions.
+    cleanupCache(cleanupEpochs);
+
+    if (isRatisEnabled) {
+      releaseUnFlushedTransactions(flushedTransactionsSize);
+    }
+    // update the last updated index in OzoneManagerStateMachine.
+    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+
+    // set metrics.
+    updateMetrics(flushedTransactionsSize);
+  }
+
+  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
+                            BatchOperation batchOperation) {
+    String lastTraceId = null;
+
+    Iterator<DoubleBufferEntry<OMClientResponse>> iterator = buffer.iterator();
+    while (iterator.hasNext()) {
+      DoubleBufferEntry<OMClientResponse> entry = iterator.next();

Review Comment:
   Thanks. Made the change.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -263,138 +255,204 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+      throws IOException {
+
+    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+    List<Long> flushedEpochs;
+
+    try (BatchOperation batchOperation = omMetadataManager.getStore()
+        .initBatchOperation()) {
+
+      String lastTraceId = addToBatch(buffer, batchOperation);
+
+      buffer.iterator().forEachRemaining(
+          entry -> addCleanupEntry(entry, cleanupEpochs));
+
+      // Commit transaction info to DB.
+      flushedEpochs = buffer.stream()
+          .map(DoubleBufferEntry::getTrxLogIndex)
+          .sorted()
+          .collect(Collectors.toList());
+
+      long lastRatisTransactionIndex = flushedEpochs.get(
+          flushedEpochs.size() - 1);
+
+      long term = isRatisEnabled ?
+          indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+      addToBatchTransactionInfoWithTrace(lastTraceId,
+          lastRatisTransactionIndex,
+          () -> {
+            omMetadataManager.getTransactionInfoTable().putWithBatch(
+                batchOperation, TRANSACTION_INFO_KEY,
+                new TransactionInfo.Builder()
+                    .setTransactionIndex(lastRatisTransactionIndex)
+                    .setCurrentTerm(term)
+                    .build());
+            return null;
+          });
+
+      long startTime = Time.monotonicNow();
+      flushBatchWithTrace(lastTraceId, buffer.size(),
+          () -> {
+            omMetadataManager.getStore()
+                .commitBatchOperation(batchOperation);
+            return null;
+          });
+
+      ozoneManagerDoubleBufferMetrics.updateFlushTime(
+          Time.monotonicNow() - startTime);
+    }
 
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
-
-          // set metrics.
-          updateMetrics(flushedTransactionsSize);
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        if (isRunning.get()) {
-          final String message = "OMDoubleBuffer flush thread " +
-              Thread.currentThread().getName() + " encountered Interrupted " +
-              "exception while running";
-          ExitUtils.terminate(1, message, ex, LOG);
-        } else {
-          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit.", Thread.currentThread().getName());
-        }
+    // Complete futures first and then do other things.
+    // So that handler threads will be released.
+    if (!isRatisEnabled) {
+      clearReadyFutureQueue(buffer.size());
+    }
+
+    int flushedTransactionsSize = buffer.size();
+    flushedTransactionCount.addAndGet(flushedTransactionsSize);
+    flushIterations.incrementAndGet();
+
+    LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+        flushIterations.get(),
+        flushedTransactionsSize);
+
+    // Clean up committed transactions.
+    cleanupCache(cleanupEpochs);
+
+    if (isRatisEnabled) {
+      releaseUnFlushedTransactions(flushedTransactionsSize);
+    }
+    // update the last updated index in OzoneManagerStateMachine.
+    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+
+    // set metrics.
+    updateMetrics(flushedTransactionsSize);
+  }
+
+  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
+                            BatchOperation batchOperation) {
+    String lastTraceId = null;
+
+    Iterator<DoubleBufferEntry<OMClientResponse>> iterator = buffer.iterator();
+    while (iterator.hasNext()) {
+      DoubleBufferEntry<OMClientResponse> entry = iterator.next();
+      OMClientResponse response = entry.getResponse();
+      OMResponse omResponse = response.getOMResponse();
+      lastTraceId = omResponse.getTraceID();
+
+      try {
+        addToBatchWithTrace(omResponse,
+            () -> {
+              response.checkAndUpdateDB(omMetadataManager, batchOperation);
+              return null;
+            });
       } catch (IOException ex) {
+        // During Adding to RocksDB batch entry got an exception.
+        // We should terminate the OM.
         terminate(ex);
-      } catch (Throwable t) {
-        final String s = "OMDoubleBuffer flush thread " +
-            Thread.currentThread().getName() + " encountered Throwable error";
-        ExitUtils.terminate(2, s, t, LOG);
       }
     }
+
+    return lastTraceId;
   }
 
+  /**
+   * Splits the readyBuffer around the create snapshot request.
+   * Returns, the list of queue split by create snapshot requests.
+   *
+   * CreateSnapshot is used as barrier because the checkpoint creation happens
+   * in RocksDB callback flush. If multiple operations are flushed in one
+   * specific batch, we are not sure at the flush of which specific operation
+   * the callback is coming.
+   * There could be a possibility of race condition that is exposed to rocksDB
+   * behaviour for the batch.
+   * Hence, we treat createSnapshot as separate batch flush.
+   *
+   * e.g. requestBuffer = [request1, request2, snapshotRequest1,
+   * request3, snapshotRequest2, request4]
+   * response = [[request1, request2], [snapshotRequest1], [request3],
+   * [snapshotRequest2], [request4]]
+   */
+  private List<Queue<DoubleBufferEntry<OMClientResponse>>>
+      splitReadyBufferAtCreateSnapshot() {
+    List<Queue<DoubleBufferEntry<OMClientResponse>>> response =
+        new ArrayList<>();
+
+    Iterator<DoubleBufferEntry<OMClientResponse>> iterator =
+        readyBuffer.iterator();
+
+    OMResponse previousOmResponse = null;
+    while (iterator.hasNext()) {
+      DoubleBufferEntry<OMClientResponse> entry = iterator.next();
+      OMResponse omResponse = entry.getResponse().getOMResponse();
+      // New queue gets created in three conditions:
+      // 1. It is first element in the response,
+      // 2. Current request is createSnapshot request.
+      // 3. Previous request was createSnapshot request.
+      if (response.isEmpty() ||
+          omResponse.getCreateSnapshotResponse() != null ||
+          (previousOmResponse != null &&
+              previousOmResponse.getCreateSnapshotResponse() != null)) {

Review Comment:
   Yes, I think it is because `CreateSnapshotResponse` is optional attribute. 



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -417,14 +475,28 @@ private void addCleanupEntry(DoubleBufferEntry entry, 
Map<String,
             .add(entry.getTrxLogIndex());
       }
     } else {
-      // This is to catch early errors, when an new response class missed to
+      // This is to catch early errors, when a new response class missed to
       // add CleanupTableInfo annotation.
       throw new RuntimeException("CleanupTableInfo Annotation is missing " +
           "for" + responseClass);
     }
   }
 
+  /**
+   * Completes futures for first count element form the readyFutureQueue
+   * so that handler thread can be released asap.
+   */
+  private void clearReadyFutureQueue(int count) {
+    if (isRatisEnabled) {
+      return;
+    }

Review Comment:
   No, I did it as a safety check. Remove it.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -417,14 +475,28 @@ private void addCleanupEntry(DoubleBufferEntry entry, 
Map<String,
             .add(entry.getTrxLogIndex());
       }
     } else {
-      // This is to catch early errors, when an new response class missed to
+      // This is to catch early errors, when a new response class missed to
       // add CleanupTableInfo annotation.
       throw new RuntimeException("CleanupTableInfo Annotation is missing " +
           "for" + responseClass);
     }
   }
 
+  /**
+   * Completes futures for first count element form the readyFutureQueue
+   * so that handler thread can be released asap.
+   */
+  private void clearReadyFutureQueue(int count) {
+    if (isRatisEnabled) {
+      return;
+    }
 
+    Iterator<CompletableFuture<Void>> iterator = readyFutureQueue.iterator();
+    while (iterator.hasNext() && count > 0) {
+      iterator.next().complete(null);
+      count--;
+    }
+  }

Review Comment:
   Thanks for catching this. I forgot to remove the item from the queue.
   Changed it to:
   
   ```
     private void clearReadyFutureQueue(int count) {
       while (!readyFutureQueue.isEmpty() && count > 0) {
         readyFutureQueue.remove().complete(null);
         count--;
       }
     }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -263,138 +255,204 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
    * and commit to DB.
    */
   private void flushTransactions() {
-    while (isRunning.get()) {
-      try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
+    while (isRunning.get() && canFlush()) {
+      flushCurrentBuffer();
+    }
+  }
 
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
+  /**
+   * This is to extract out the flushing logic to make it testable.
+   * If we don't do that, there could be a race condition which could fail
+   * the unit test on different machines.
+   */
+  @VisibleForTesting
+  void flushCurrentBuffer() {
+    try {
+      swapCurrentAndReadyBuffer();
+
+      // For snapshot, we want to include all the keys that were committed
+      // before the snapshot `create` command was executed. To achieve
+      // the behaviour, we spilt the request buffer at snapshot create
+      // request and flush the buffer in batches split at snapshot create
+      // request.
+      // For example, if requestBuffer is [request1, request2,
+      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+      //
+      // Split requestBuffer would be.
+      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
+      //     [snapshotRequest2], [request4]].
+      // And bufferQueues will be flushed in following order:
+      // Flush #1: [request1, request2]
+      // Flush #2: [snapshotRequest1]
+      // Flush #3: [request3]
+      // Flush #4: [snapshotRequest2]
+      // Flush #5: [request4]
+      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
+          splitReadyBufferAtCreateSnapshot();
+
+      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
+        flushBatch(buffer);
+      }
+
+      clearReadyBuffer();
+    } catch (IOException ex) {
+      terminate(ex);
+    } catch (Throwable t) {
+      final String s = "OMDoubleBuffer flush thread " +
+          Thread.currentThread().getName() + " encountered Throwable error";
+      ExitUtils.terminate(2, s, t, LOG);
+    }
+  }
+  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
+      throws IOException {
+
+    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
+    List<Long> flushedEpochs;
+
+    try (BatchOperation batchOperation = omMetadataManager.getStore()
+        .initBatchOperation()) {
+
+      String lastTraceId = addToBatch(buffer, batchOperation);
+
+      buffer.iterator().forEachRemaining(
+          entry -> addCleanupEntry(entry, cleanupEpochs));
+
+      // Commit transaction info to DB.
+      flushedEpochs = buffer.stream()
+          .map(DoubleBufferEntry::getTrxLogIndex)
+          .sorted()
+          .collect(Collectors.toList());
+
+      long lastRatisTransactionIndex = flushedEpochs.get(
+          flushedEpochs.size() - 1);
+
+      long term = isRatisEnabled ?
+          indexToTerm.apply(lastRatisTransactionIndex) : -1;
+
+      addToBatchTransactionInfoWithTrace(lastTraceId,
+          lastRatisTransactionIndex,
+          () -> {
+            omMetadataManager.getTransactionInfoTable().putWithBatch(
+                batchOperation, TRANSACTION_INFO_KEY,
+                new TransactionInfo.Builder()
+                    .setTransactionIndex(lastRatisTransactionIndex)
+                    .setCurrentTerm(term)
+                    .build());
+            return null;
+          });
+
+      long startTime = Time.monotonicNow();
+      flushBatchWithTrace(lastTraceId, buffer.size(),
+          () -> {
+            omMetadataManager.getStore()
+                .commitBatchOperation(batchOperation);
+            return null;
+          });
+
+      ozoneManagerDoubleBufferMetrics.updateFlushTime(
+          Time.monotonicNow() - startTime);
+    }
 
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
-
-          // set metrics.
-          updateMetrics(flushedTransactionsSize);
-        }
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-        if (isRunning.get()) {
-          final String message = "OMDoubleBuffer flush thread " +
-              Thread.currentThread().getName() + " encountered Interrupted " +
-              "exception while running";
-          ExitUtils.terminate(1, message, ex, LOG);
-        } else {
-          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
-              + "exit.", Thread.currentThread().getName());
-        }
+    // Complete futures first and then do other things.
+    // So that handler threads will be released.
+    if (!isRatisEnabled) {
+      clearReadyFutureQueue(buffer.size());
+    }
+
+    int flushedTransactionsSize = buffer.size();
+    flushedTransactionCount.addAndGet(flushedTransactionsSize);
+    flushIterations.incrementAndGet();
+
+    LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
+        flushIterations.get(),
+        flushedTransactionsSize);
+
+    // Clean up committed transactions.
+    cleanupCache(cleanupEpochs);
+
+    if (isRatisEnabled) {
+      releaseUnFlushedTransactions(flushedTransactionsSize);
+    }
+    // update the last updated index in OzoneManagerStateMachine.
+    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
+
+    // set metrics.
+    updateMetrics(flushedTransactionsSize);
+  }
+
+  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
+                            BatchOperation batchOperation) {
+    String lastTraceId = null;

Review Comment:
   It was because it was accessed in lambda. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to