This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 98e3a5033d7f716ba6717ddfac26202b2fa492d6
Merge: d8765436c2 4531701c4d
Author: Siyao Meng <[email protected]>
AuthorDate: Tue Jan 10 15:29:31 2023 -0800

    Merge remote-tracking branch 'asf/master' into HDDS-6517-Snapshot
    
    Conflicts:
    
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
    
    Change-Id: I9bcbbad70b80023545e3332ab450fee6ec4d01fd

 .../hadoop/hdds/protocol/DatanodeDetails.java      | 13 ++++-------
 .../ECReconstructionCommandInfo.java               | 11 ++-------
 .../commands/ReconstructECContainersCommand.java   |  6 ++---
 .../commands/ReplicateContainerCommand.java        |  5 +---
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 14 +++++++----
 .../ozone/om/protocolPB/OMAdminProtocolPB.java     |  2 +-
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   | 27 ++++++++++++++--------
 7 files changed, 37 insertions(+), 41 deletions(-)

diff --cc 
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 2fedea5d54,4a255f263f..9e03e10f52
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@@ -255,204 -263,139 +255,204 @@@ public final class OzoneManagerDoubleBu
     * and commit to DB.
     */
    private void flushTransactions() {
 -    while (isRunning.get()) {
 -      try {
 -        if (canFlush()) {
 -          Map<String, List<Long>> cleanupEpochs = new HashMap<>();
 -
 -          setReadyBuffer();
 -          List<Long> flushedEpochs = null;
 -          try (BatchOperation batchOperation = omMetadataManager.getStore()
 -              .initBatchOperation()) {
 -
 -            AtomicReference<String> lastTraceId = new AtomicReference<>();
 -            readyBuffer.iterator().forEachRemaining((entry) -> {
 -              OMResponse omResponse = null;
 -              try {
 -                omResponse = entry.getResponse().getOMResponse();
 -                lastTraceId.set(omResponse.getTraceID());
 -                addToBatchWithTrace(omResponse,
 -                    (SupplierWithIOException<Void>) () -> {
 -                      entry.getResponse().checkAndUpdateDB(omMetadataManager,
 -                          batchOperation);
 -                      return null;
 -                    });
 -
 -                addCleanupEntry(entry, cleanupEpochs);
 -
 -              } catch (IOException ex) {
 -                // During Adding to RocksDB batch entry got an exception.
 -                // We should terminate the OM.
 -                terminate(ex, 1, omResponse);
 -              } catch (Throwable t) {
 -                terminate(t, 2, omResponse);
 -              }
 -            });
 +    while (isRunning.get() && canFlush()) {
 +      flushCurrentBuffer();
 +    }
 +  }
  
 -            // Commit transaction info to DB.
 -            flushedEpochs = readyBuffer.stream().map(
 -                DoubleBufferEntry::getTrxLogIndex)
 -                .sorted().collect(Collectors.toList());
 -            long lastRatisTransactionIndex = flushedEpochs.get(
 -                flushedEpochs.size() - 1);
 -            long term = isRatisEnabled ?
 -                indexToTerm.apply(lastRatisTransactionIndex) : -1;
 -
 -            addToBatchTransactionInfoWithTrace(lastTraceId.get(),
 -                lastRatisTransactionIndex,
 -                (SupplierWithIOException<Void>) () -> {
 -                  omMetadataManager.getTransactionInfoTable().putWithBatch(
 -                      batchOperation, TRANSACTION_INFO_KEY,
 -                      new TransactionInfo.Builder()
 -                          .setTransactionIndex(lastRatisTransactionIndex)
 -                          .setCurrentTerm(term).build());
 -                  return null;
 -                });
 -
 -            long startTime = Time.monotonicNow();
 -            flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(),
 -                () -> {
 -                  omMetadataManager.getStore().commitBatchOperation(
 -                      batchOperation);
 -                  return null;
 -                });
 -            ozoneManagerDoubleBufferMetrics.updateFlushTime(
 -                Time.monotonicNow() - startTime);
 -          }
 -
 -          // Complete futures first and then do other things. So, that
 -          // handler threads will be released.
 -          if (!isRatisEnabled) {
 -            // Once all entries are flushed, we can complete their future.
 -            readyFutureQueue.iterator().forEachRemaining((entry) -> {
 -              entry.complete(null);
 -            });
 +  /**
 +   * This is to extract out the flushing logic to make it testable.
 +   * If we don't do that, there could be a race condition which could fail
 +   * the unit test on different machines.
 +   */
 +  @VisibleForTesting
 +  void flushCurrentBuffer() {
 +    try {
 +      swapCurrentAndReadyBuffer();
 +
 +      // For snapshot, we want to include all the keys that were committed
 +      // before the snapshot `create` command was executed. To achieve
 +      // the behaviour, we spilt the request buffer at snapshot create
 +      // request and flush the buffer in batches split at snapshot create
 +      // request.
 +      // For example, if requestBuffer is [request1, request2,
 +      // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4].
 +      //
 +      // Split requestBuffer would be.
 +      // bufferQueues = [[request1, request2], [snapshotRequest1], [request3],
 +      //     [snapshotRequest2], [request4]].
 +      // And bufferQueues will be flushed in following order:
 +      // Flush #1: [request1, request2]
 +      // Flush #2: [snapshotRequest1]
 +      // Flush #3: [request3]
 +      // Flush #4: [snapshotRequest2]
 +      // Flush #5: [request4]
 +      List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues =
 +          splitReadyBufferAtCreateSnapshot();
 +
 +      for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) {
 +        flushBatch(buffer);
 +      }
 +
 +      clearReadyBuffer();
 +    } catch (IOException ex) {
-       terminate(ex);
++      terminate(ex, 1);
 +    } catch (Throwable t) {
-       final String s = "OMDoubleBuffer flush thread " +
-           Thread.currentThread().getName() + " encountered Throwable error";
-       ExitUtils.terminate(2, s, t, LOG);
++      terminate(t, 2);
 +    }
 +  }
 +
 +  private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer)
 +      throws IOException {
  
 -            readyFutureQueue.clear();
 -          }
 -
 -          int flushedTransactionsSize = readyBuffer.size();
 -          flushedTransactionCount.addAndGet(flushedTransactionsSize);
 -          flushIterations.incrementAndGet();
 -
 -          if (LOG.isDebugEnabled()) {
 -            LOG.debug("Sync Iteration {} flushed transactions in this " +
 -                    "iteration {}", flushIterations.get(),
 -                flushedTransactionsSize);
 -          }
 -
 -          // When non-HA do the sort step here, as the sorted list is not
 -          // required for flush to DB. As in non-HA we want to complete
 -          // futures as quick as possible after flush to DB, to release rpc
 -          // handler threads.
 -          if (!isRatisEnabled) {
 -            flushedEpochs =
 -                readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex)
 -                    .sorted().collect(Collectors.toList());
 -          }
 -
 -
 -          // Clean up committed transactions.
 -
 -          cleanupCache(cleanupEpochs);
 -
 -          readyBuffer.clear();
 -
 -          if (isRatisEnabled) {
 -            releaseUnFlushedTransactions(flushedTransactionsSize);
 -          }
 -
 -          // update the last updated index in OzoneManagerStateMachine.
 -          ozoneManagerRatisSnapShot.updateLastAppliedIndex(
 -              flushedEpochs);
 -
 -          // set metrics.
 -          updateMetrics(flushedTransactionsSize);
 -        }
 -      } catch (InterruptedException ex) {
 -        Thread.currentThread().interrupt();
 -        if (isRunning.get()) {
 -          final String message = "OMDoubleBuffer flush thread " +
 -              Thread.currentThread().getName() + " encountered Interrupted " +
 -              "exception while running";
 -          ExitUtils.terminate(1, message, ex, LOG);
 -        } else {
 -          LOG.info("OMDoubleBuffer flush thread {} is interrupted and will "
 -              + "exit.", Thread.currentThread().getName());
 -        }
 +    Map<String, List<Long>> cleanupEpochs = new HashMap<>();
 +    List<Long> flushedEpochs;
 +
 +    try (BatchOperation batchOperation = omMetadataManager.getStore()
 +        .initBatchOperation()) {
 +
 +      String lastTraceId = addToBatch(buffer, batchOperation);
 +
 +      buffer.iterator().forEachRemaining(
 +          entry -> addCleanupEntry(entry, cleanupEpochs));
 +
 +      // Commit transaction info to DB.
 +      flushedEpochs = buffer.stream()
 +          .map(DoubleBufferEntry::getTrxLogIndex)
 +          .sorted()
 +          .collect(Collectors.toList());
 +
 +      long lastRatisTransactionIndex = flushedEpochs.get(
 +          flushedEpochs.size() - 1);
 +
 +      long term = isRatisEnabled ?
 +          indexToTerm.apply(lastRatisTransactionIndex) : -1;
 +
 +      addToBatchTransactionInfoWithTrace(lastTraceId,
 +          lastRatisTransactionIndex,
 +          () -> {
 +            omMetadataManager.getTransactionInfoTable().putWithBatch(
 +                batchOperation, TRANSACTION_INFO_KEY,
 +                new TransactionInfo.Builder()
 +                    .setTransactionIndex(lastRatisTransactionIndex)
 +                    .setCurrentTerm(term)
 +                    .build());
 +            return null;
 +          });
 +
 +      long startTime = Time.monotonicNow();
 +      flushBatchWithTrace(lastTraceId, buffer.size(),
 +          () -> {
 +            omMetadataManager.getStore()
 +                .commitBatchOperation(batchOperation);
 +            return null;
 +          });
 +
 +      ozoneManagerDoubleBufferMetrics.updateFlushTime(
 +          Time.monotonicNow() - startTime);
 +    }
 +
 +    // Complete futures first and then do other things.
 +    // So that handler threads will be released.
 +    if (!isRatisEnabled) {
 +      clearReadyFutureQueue(buffer.size());
 +    }
 +
 +    int flushedTransactionsSize = buffer.size();
 +    flushedTransactionCount.addAndGet(flushedTransactionsSize);
 +    flushIterations.incrementAndGet();
 +
 +    if (LOG.isDebugEnabled()) {
 +      LOG.debug("Sync iteration {} flushed transactions in this iteration {}",
 +          flushIterations.get(),
 +          flushedTransactionsSize);
 +    }
 +
 +    // Clean up committed transactions.
 +    cleanupCache(cleanupEpochs);
 +
 +    if (isRatisEnabled) {
 +      releaseUnFlushedTransactions(flushedTransactionsSize);
 +    }
 +    // update the last updated index in OzoneManagerStateMachine.
 +    ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs);
 +
 +    // set metrics.
 +    updateMetrics(flushedTransactionsSize);
 +  }
 +
 +  private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer,
 +                            BatchOperation batchOperation) {
 +    String lastTraceId = null;
 +    for (DoubleBufferEntry<OMClientResponse> entry: buffer) {
 +      OMClientResponse response = entry.getResponse();
 +      OMResponse omResponse = response.getOMResponse();
 +      lastTraceId = omResponse.getTraceID();
 +
 +      try {
 +        addToBatchWithTrace(omResponse,
 +            () -> {
 +              response.checkAndUpdateDB(omMetadataManager, batchOperation);
 +              return null;
 +            });
        } catch (IOException ex) {
 -        terminate(ex, 1);
 +        // During Adding to RocksDB batch entry got an exception.
 +        // We should terminate the OM.
-         terminate(ex);
++        terminate(ex, 1, omResponse);
+       } catch (Throwable t) {
 -        terminate(t, 2);
++        terminate(t, 2, omResponse);
        }
      }
 +
 +    return lastTraceId;
    }
  
 +  /**
 +   * Splits the readyBuffer around the create snapshot request.
 +   * Returns, the list of queue split by create snapshot requests.
 +   *
 +   * CreateSnapshot is used as barrier because the checkpoint creation happens
 +   * in RocksDB callback flush. If multiple operations are flushed in one
 +   * specific batch, we are not sure at the flush of which specific operation
 +   * the callback is coming.
 +   * There could be a possibility of race condition that is exposed to rocksDB
 +   * behaviour for the batch.
 +   * Hence, we treat createSnapshot as separate batch flush.
 +   *
 +   * e.g. requestBuffer = [request1, request2, snapshotRequest1,
 +   * request3, snapshotRequest2, request4]
 +   * response = [[request1, request2], [snapshotRequest1], [request3],
 +   * [snapshotRequest2], [request4]]
 +   */
 +  private List<Queue<DoubleBufferEntry<OMClientResponse>>>
 +      splitReadyBufferAtCreateSnapshot() {
 +    List<Queue<DoubleBufferEntry<OMClientResponse>>> response =
 +        new ArrayList<>();
 +
 +    Iterator<DoubleBufferEntry<OMClientResponse>> iterator =
 +        readyBuffer.iterator();
 +
 +    OMResponse previousOmResponse = null;
 +    while (iterator.hasNext()) {
 +      DoubleBufferEntry<OMClientResponse> entry = iterator.next();
 +      OMResponse omResponse = entry.getResponse().getOMResponse();
 +      // New queue gets created in three conditions:
 +      // 1. It is first element in the response,
 +      // 2. Current request is createSnapshot request.
 +      // 3. Previous request was createSnapshot request.
 +      if (response.isEmpty() ||
 +          omResponse.getCreateSnapshotResponse() != null ||
 +          (previousOmResponse != null &&
 +              previousOmResponse.getCreateSnapshotResponse() != null)) {
 +        response.add(new LinkedList<>());
 +      }
 +
 +      response.get(response.size() - 1).add(entry);
 +      previousOmResponse = omResponse;
 +    }
 +
 +    return response;
 +  }
  
    private void addCleanupEntry(DoubleBufferEntry entry, Map<String,
        List<Long>> cleanupEpochs) {
@@@ -549,11 -475,24 +549,20 @@@
      } else {
        LOG.info("OMDoubleBuffer flush thread is not running.");
      }
 -
    }
-   private void terminate(IOException ex) {
-     String message = "During flush to DB encountered error in " +
-         "OMDoubleBuffer flush thread " + Thread.currentThread().getName();
-     ExitUtils.terminate(1, message, ex, LOG);
+ 
+   private void terminate(Throwable t, int status) {
+     terminate(t, status, null);
+   }
+ 
+   private void terminate(Throwable t, int status, OMResponse omResponse) {
+     StringBuilder message = new StringBuilder(
+         "During flush to DB encountered error in " +
+         "OMDoubleBuffer flush thread " + Thread.currentThread().getName());
+     if (omResponse != null) {
+       message.append(" when handling OMRequest: ").append(omResponse);
+     }
+     ExitUtils.terminate(status, message.toString(), t, LOG);
    }
  
    /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to