This is an automated email from the ASF dual-hosted git repository. siyao pushed a commit to branch HDDS-6517-Snapshot in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 98e3a5033d7f716ba6717ddfac26202b2fa492d6 Merge: d8765436c2 4531701c4d Author: Siyao Meng <[email protected]> AuthorDate: Tue Jan 10 15:29:31 2023 -0800 Merge remote-tracking branch 'asf/master' into HDDS-6517-Snapshot Conflicts: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java Change-Id: I9bcbbad70b80023545e3332ab450fee6ec4d01fd .../hadoop/hdds/protocol/DatanodeDetails.java | 13 ++++------- .../ECReconstructionCommandInfo.java | 11 ++------- .../commands/ReconstructECContainersCommand.java | 6 ++--- .../commands/ReplicateContainerCommand.java | 5 +--- .../hadoop/hdds/scm/node/SCMNodeManager.java | 14 +++++++---- .../ozone/om/protocolPB/OMAdminProtocolPB.java | 2 +- .../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 27 ++++++++++++++-------- 7 files changed, 37 insertions(+), 41 deletions(-) diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java index 2fedea5d54,4a255f263f..9e03e10f52 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java @@@ -255,204 -263,139 +255,204 @@@ public final class OzoneManagerDoubleBu * and commit to DB. */ private void flushTransactions() { - while (isRunning.get()) { - try { - if (canFlush()) { - Map<String, List<Long>> cleanupEpochs = new HashMap<>(); - - setReadyBuffer(); - List<Long> flushedEpochs = null; - try (BatchOperation batchOperation = omMetadataManager.getStore() - .initBatchOperation()) { - - AtomicReference<String> lastTraceId = new AtomicReference<>(); - readyBuffer.iterator().forEachRemaining((entry) -> { - OMResponse omResponse = null; - try { - omResponse = entry.getResponse().getOMResponse(); - lastTraceId.set(omResponse.getTraceID()); - addToBatchWithTrace(omResponse, - (SupplierWithIOException<Void>) () -> { - entry.getResponse().checkAndUpdateDB(omMetadataManager, - batchOperation); - return null; - }); - - addCleanupEntry(entry, cleanupEpochs); - - } catch (IOException ex) { - // During Adding to RocksDB batch entry got an exception. - // We should terminate the OM. - terminate(ex, 1, omResponse); - } catch (Throwable t) { - terminate(t, 2, omResponse); - } - }); + while (isRunning.get() && canFlush()) { + flushCurrentBuffer(); + } + } - // Commit transaction info to DB. - flushedEpochs = readyBuffer.stream().map( - DoubleBufferEntry::getTrxLogIndex) - .sorted().collect(Collectors.toList()); - long lastRatisTransactionIndex = flushedEpochs.get( - flushedEpochs.size() - 1); - long term = isRatisEnabled ? - indexToTerm.apply(lastRatisTransactionIndex) : -1; - - addToBatchTransactionInfoWithTrace(lastTraceId.get(), - lastRatisTransactionIndex, - (SupplierWithIOException<Void>) () -> { - omMetadataManager.getTransactionInfoTable().putWithBatch( - batchOperation, TRANSACTION_INFO_KEY, - new TransactionInfo.Builder() - .setTransactionIndex(lastRatisTransactionIndex) - .setCurrentTerm(term).build()); - return null; - }); - - long startTime = Time.monotonicNow(); - flushBatchWithTrace(lastTraceId.get(), readyBuffer.size(), - () -> { - omMetadataManager.getStore().commitBatchOperation( - batchOperation); - return null; - }); - ozoneManagerDoubleBufferMetrics.updateFlushTime( - Time.monotonicNow() - startTime); - } - - // Complete futures first and then do other things. So, that - // handler threads will be released. - if (!isRatisEnabled) { - // Once all entries are flushed, we can complete their future. - readyFutureQueue.iterator().forEachRemaining((entry) -> { - entry.complete(null); - }); + /** + * This is to extract out the flushing logic to make it testable. + * If we don't do that, there could be a race condition which could fail + * the unit test on different machines. + */ + @VisibleForTesting + void flushCurrentBuffer() { + try { + swapCurrentAndReadyBuffer(); + + // For snapshot, we want to include all the keys that were committed + // before the snapshot `create` command was executed. To achieve + // the behaviour, we spilt the request buffer at snapshot create + // request and flush the buffer in batches split at snapshot create + // request. + // For example, if requestBuffer is [request1, request2, + // snapshotCreateRequest1, request3, snapshotCreateRequest2, request4]. + // + // Split requestBuffer would be. + // bufferQueues = [[request1, request2], [snapshotRequest1], [request3], + // [snapshotRequest2], [request4]]. + // And bufferQueues will be flushed in following order: + // Flush #1: [request1, request2] + // Flush #2: [snapshotRequest1] + // Flush #3: [request3] + // Flush #4: [snapshotRequest2] + // Flush #5: [request4] + List<Queue<DoubleBufferEntry<OMClientResponse>>> bufferQueues = + splitReadyBufferAtCreateSnapshot(); + + for (Queue<DoubleBufferEntry<OMClientResponse>> buffer : bufferQueues) { + flushBatch(buffer); + } + + clearReadyBuffer(); + } catch (IOException ex) { - terminate(ex); ++ terminate(ex, 1); + } catch (Throwable t) { - final String s = "OMDoubleBuffer flush thread " + - Thread.currentThread().getName() + " encountered Throwable error"; - ExitUtils.terminate(2, s, t, LOG); ++ terminate(t, 2); + } + } + + private void flushBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer) + throws IOException { - readyFutureQueue.clear(); - } - - int flushedTransactionsSize = readyBuffer.size(); - flushedTransactionCount.addAndGet(flushedTransactionsSize); - flushIterations.incrementAndGet(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Sync Iteration {} flushed transactions in this " + - "iteration {}", flushIterations.get(), - flushedTransactionsSize); - } - - // When non-HA do the sort step here, as the sorted list is not - // required for flush to DB. As in non-HA we want to complete - // futures as quick as possible after flush to DB, to release rpc - // handler threads. - if (!isRatisEnabled) { - flushedEpochs = - readyBuffer.stream().map(DoubleBufferEntry::getTrxLogIndex) - .sorted().collect(Collectors.toList()); - } - - - // Clean up committed transactions. - - cleanupCache(cleanupEpochs); - - readyBuffer.clear(); - - if (isRatisEnabled) { - releaseUnFlushedTransactions(flushedTransactionsSize); - } - - // update the last updated index in OzoneManagerStateMachine. - ozoneManagerRatisSnapShot.updateLastAppliedIndex( - flushedEpochs); - - // set metrics. - updateMetrics(flushedTransactionsSize); - } - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - if (isRunning.get()) { - final String message = "OMDoubleBuffer flush thread " + - Thread.currentThread().getName() + " encountered Interrupted " + - "exception while running"; - ExitUtils.terminate(1, message, ex, LOG); - } else { - LOG.info("OMDoubleBuffer flush thread {} is interrupted and will " - + "exit.", Thread.currentThread().getName()); - } + Map<String, List<Long>> cleanupEpochs = new HashMap<>(); + List<Long> flushedEpochs; + + try (BatchOperation batchOperation = omMetadataManager.getStore() + .initBatchOperation()) { + + String lastTraceId = addToBatch(buffer, batchOperation); + + buffer.iterator().forEachRemaining( + entry -> addCleanupEntry(entry, cleanupEpochs)); + + // Commit transaction info to DB. + flushedEpochs = buffer.stream() + .map(DoubleBufferEntry::getTrxLogIndex) + .sorted() + .collect(Collectors.toList()); + + long lastRatisTransactionIndex = flushedEpochs.get( + flushedEpochs.size() - 1); + + long term = isRatisEnabled ? + indexToTerm.apply(lastRatisTransactionIndex) : -1; + + addToBatchTransactionInfoWithTrace(lastTraceId, + lastRatisTransactionIndex, + () -> { + omMetadataManager.getTransactionInfoTable().putWithBatch( + batchOperation, TRANSACTION_INFO_KEY, + new TransactionInfo.Builder() + .setTransactionIndex(lastRatisTransactionIndex) + .setCurrentTerm(term) + .build()); + return null; + }); + + long startTime = Time.monotonicNow(); + flushBatchWithTrace(lastTraceId, buffer.size(), + () -> { + omMetadataManager.getStore() + .commitBatchOperation(batchOperation); + return null; + }); + + ozoneManagerDoubleBufferMetrics.updateFlushTime( + Time.monotonicNow() - startTime); + } + + // Complete futures first and then do other things. + // So that handler threads will be released. + if (!isRatisEnabled) { + clearReadyFutureQueue(buffer.size()); + } + + int flushedTransactionsSize = buffer.size(); + flushedTransactionCount.addAndGet(flushedTransactionsSize); + flushIterations.incrementAndGet(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Sync iteration {} flushed transactions in this iteration {}", + flushIterations.get(), + flushedTransactionsSize); + } + + // Clean up committed transactions. + cleanupCache(cleanupEpochs); + + if (isRatisEnabled) { + releaseUnFlushedTransactions(flushedTransactionsSize); + } + // update the last updated index in OzoneManagerStateMachine. + ozoneManagerRatisSnapShot.updateLastAppliedIndex(flushedEpochs); + + // set metrics. + updateMetrics(flushedTransactionsSize); + } + + private String addToBatch(Queue<DoubleBufferEntry<OMClientResponse>> buffer, + BatchOperation batchOperation) { + String lastTraceId = null; + for (DoubleBufferEntry<OMClientResponse> entry: buffer) { + OMClientResponse response = entry.getResponse(); + OMResponse omResponse = response.getOMResponse(); + lastTraceId = omResponse.getTraceID(); + + try { + addToBatchWithTrace(omResponse, + () -> { + response.checkAndUpdateDB(omMetadataManager, batchOperation); + return null; + }); } catch (IOException ex) { - terminate(ex, 1); + // During Adding to RocksDB batch entry got an exception. + // We should terminate the OM. - terminate(ex); ++ terminate(ex, 1, omResponse); + } catch (Throwable t) { - terminate(t, 2); ++ terminate(t, 2, omResponse); } } + + return lastTraceId; } + /** + * Splits the readyBuffer around the create snapshot request. + * Returns, the list of queue split by create snapshot requests. + * + * CreateSnapshot is used as barrier because the checkpoint creation happens + * in RocksDB callback flush. If multiple operations are flushed in one + * specific batch, we are not sure at the flush of which specific operation + * the callback is coming. + * There could be a possibility of race condition that is exposed to rocksDB + * behaviour for the batch. + * Hence, we treat createSnapshot as separate batch flush. + * + * e.g. requestBuffer = [request1, request2, snapshotRequest1, + * request3, snapshotRequest2, request4] + * response = [[request1, request2], [snapshotRequest1], [request3], + * [snapshotRequest2], [request4]] + */ + private List<Queue<DoubleBufferEntry<OMClientResponse>>> + splitReadyBufferAtCreateSnapshot() { + List<Queue<DoubleBufferEntry<OMClientResponse>>> response = + new ArrayList<>(); + + Iterator<DoubleBufferEntry<OMClientResponse>> iterator = + readyBuffer.iterator(); + + OMResponse previousOmResponse = null; + while (iterator.hasNext()) { + DoubleBufferEntry<OMClientResponse> entry = iterator.next(); + OMResponse omResponse = entry.getResponse().getOMResponse(); + // New queue gets created in three conditions: + // 1. It is first element in the response, + // 2. Current request is createSnapshot request. + // 3. Previous request was createSnapshot request. + if (response.isEmpty() || + omResponse.getCreateSnapshotResponse() != null || + (previousOmResponse != null && + previousOmResponse.getCreateSnapshotResponse() != null)) { + response.add(new LinkedList<>()); + } + + response.get(response.size() - 1).add(entry); + previousOmResponse = omResponse; + } + + return response; + } private void addCleanupEntry(DoubleBufferEntry entry, Map<String, List<Long>> cleanupEpochs) { @@@ -549,11 -475,24 +549,20 @@@ } else { LOG.info("OMDoubleBuffer flush thread is not running."); } - } - private void terminate(IOException ex) { - String message = "During flush to DB encountered error in " + - "OMDoubleBuffer flush thread " + Thread.currentThread().getName(); - ExitUtils.terminate(1, message, ex, LOG); + + private void terminate(Throwable t, int status) { + terminate(t, status, null); + } + + private void terminate(Throwable t, int status, OMResponse omResponse) { + StringBuilder message = new StringBuilder( + "During flush to DB encountered error in " + + "OMDoubleBuffer flush thread " + Thread.currentThread().getName()); + if (omResponse != null) { + message.append(" when handling OMRequest: ").append(omResponse); + } + ExitUtils.terminate(status, message.toString(), t, LOG); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
