prashantpogde commented on code in PR #3958:
URL: https://github.com/apache/ozone/pull/3958#discussion_r1024376457


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -182,10 +179,8 @@ private OzoneManagerDoubleBuffer(OMMetadataManager 
omMetadataManager,
     if (!isRatisEnabled) {
       this.currentFutureQueue = new ConcurrentLinkedQueue<>();
       this.readyFutureQueue = new ConcurrentLinkedQueue<>();
-    } else {
-      this.currentFutureQueue = null;
-      this.readyFutureQueue = null;
     }
+

Review Comment:
   extra line



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java:
##########
@@ -265,115 +259,41 @@ private Void addToBatchTransactionInfoWithTrace(String 
parentName,
   private void flushTransactions() {
     while (isRunning.get()) {
       try {
-        if (canFlush()) {
-          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
-
-          setReadyBuffer();
-          List<Long> flushedEpochs = null;
-          try (BatchOperation batchOperation = omMetadataManager.getStore()
-              .initBatchOperation()) {
-
-            AtomicReference<String> lastTraceId = new AtomicReference<>();
-            readyBuffer.iterator().forEachRemaining((entry) -> {
-              try {
-                OMResponse omResponse = entry.getResponse().getOMResponse();
-                lastTraceId.set(omResponse.getTraceID());
-                addToBatchWithTrace(omResponse,
-                    (SupplierWithIOException<Void>) () -> {
-                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
-                          batchOperation);
-                      return null;
-                    });
-
-                addCleanupEntry(entry, cleanupEpochs);
-
-              } catch (IOException ex) {
-                // During Adding to RocksDB batch entry got an exception.
-                // We should terminate the OM.
-                terminate(ex);
-              }
-            });
-
-            // Commit transaction info to DB.
-            flushedEpochs = readyBuffer.stream().map(
-                DoubleBufferEntry::getTrxLogIndex)
-                .sorted().collect(Collectors.toList());
-            long lastRatisTransactionIndex = flushedEpochs.get(
-                flushedEpochs.size() - 1);
-            long term = isRatisEnabled ?
-                indexToTerm.apply(lastRatisTransactionIndex) : -1;
-
-            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
-                lastRatisTransactionIndex,
-                (SupplierWithIOException<Void>) () -> {
-                  omMetadataManager.getTransactionInfoTable().putWithBatch(
-                      batchOperation, TRANSACTION_INFO_KEY,
-                      new TransactionInfo.Builder()
-                          .setTransactionIndex(lastRatisTransactionIndex)
-                          .setCurrentTerm(term).build());
-                  return null;
-                });
-
-            long startTime = Time.monotonicNow();
-            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
-                () -> {
-                  omMetadataManager.getStore().commitBatchOperation(
-                      batchOperation);
-                  return null;
-                });
-            ozoneManagerDoubleBufferMetrics.updateFlushTime(
-                Time.monotonicNow() - startTime);
-          }
-
-          // Complete futures first and then do other things. So, that
-          // handler threads will be released.
-          if (!isRatisEnabled) {
-            // Once all entries are flushed, we can complete their future.
-            readyFutureQueue.iterator().forEachRemaining((entry) -> {
-              entry.complete(null);
-            });
-
-            readyFutureQueue.clear();
-          }
-
-          int flushedTransactionsSize = readyBuffer.size();
-          flushedTransactionCount.addAndGet(flushedTransactionsSize);
-          flushIterations.incrementAndGet();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Sync Iteration {} flushed transactions in this " +
-                    "iteration {}", flushIterations.get(),
-                flushedTransactionsSize);
-          }
-
-          // When non-HA do the sort step here, as the sorted list is not
-          // required for flush to DB. As in non-HA we want to complete
-          // futures as quick as possible after flush to DB, to release rpc
-          // handler threads.
-          if (!isRatisEnabled) {
-            flushedEpochs =
-                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
-                    .sorted().collect(Collectors.toList());
-          }
-
-
-          // Clean up committed transactions.
-
-          cleanupCache(cleanupEpochs);
-
-          readyBuffer.clear();
-
-          if (isRatisEnabled) {
-            releaseUnFlushedTransactions(flushedTransactionsSize);
-          }
-
-          // update the last updated index in OzoneManagerStateMachine.
-          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
-              flushedEpochs);
+        // Wait till there is some transaction to flush.
+        canFlush();
+
+        setReadyBuffer();
+
+        // For snapshot, we want including all the keys that were committed
+        // before the snapshot `create` command was executed. To achieve
+        // the behaviour, we spilt the request buffer at snapshot create
+        // request and flush the buffer in batches split at snapshot create
+        // request.
+        // For example, if requestBuffer is [request1, request2,
+        // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
+        //
+        // Split requestBuffer would be.
+        // bufferQueues = [[request1, request2], [snapshotRequest1, request3],

Review Comment:
   Let us use snapshotCreate() as a barrier operation. For example, 
bufferQueues can be broken as  [[request1, request2], [snapshotRequest1] 
,[request3], [snapshotRequest2], [request4,...]].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to