steffenvan commented on code in PR #1435: URL: https://github.com/apache/jackrabbit-oak/pull/1435#discussion_r1596540743
########## oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java: ########## @@ -581,94 +498,295 @@ private MongoFilterPaths getPathsForRegexFiltering() { } } - private void download(FindIterable<NodeDocument> mongoIterable) throws InterruptedException, TimeoutException { - try (MongoCursor<NodeDocument> cursor = mongoIterable.iterator()) { - NodeDocument[] batch = new NodeDocument[maxBatchNumberOfDocuments]; - int nextIndex = 0; - int batchSize = 0; - try { - while (cursor.hasNext()) { - NodeDocument next = cursor.next(); - String id = next.getId(); - // If we are retrying on connection errors, we need to keep track of the last _modified value - if (retryOnConnectionErrors) { - this.nextLastModified = next.getModified(); + private void logWithRateLimit(Runnable f) { + Instant now = Instant.now(); + if (Duration.between(lastDelayedEnqueueWarningMessageLoggedTimestamp, now).toSeconds() > MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES) { + f.run(); + lastDelayedEnqueueWarningMessageLoggedTimestamp = now; + } + } + + enum DownloadOrder { + ASCENDING, + DESCENDING, + UNDEFINED; + + public boolean downloadInAscendingOrder() { + return this == ASCENDING || this == UNDEFINED; + } + } + + /** + * Downloads a given range from Mongo. Instances of this class should be used for downloading a single range. + * To download multiple ranges, create multiple instances of this class. + */ + private class DownloadTask { + private final DownloadOrder downloadOrder; + private final DownloadStageStatistics downloadStatics; + private long documentsDownloadedTotalBytes; + private long documentsDownloadedTotal; + private long totalEnqueueWaitTimeMillis; + private long nextLastModified; + private String lastIdDownloaded; + + DownloadTask(DownloadOrder downloadOrder, DownloadStageStatistics downloadStatics) { + this.downloadOrder = downloadOrder; + this.downloadStatics = downloadStatics; + this.documentsDownloadedTotalBytes = 0; + this.documentsDownloadedTotal = 0; + this.totalEnqueueWaitTimeMillis = 0; + this.nextLastModified = downloadOrder.downloadInAscendingOrder() ? 0 : Long.MAX_VALUE; + this.lastIdDownloaded = null; + } + + private Instant failuresStartTimestamp = null; // When the last series of failures started + private int numberOfFailures = 0; + + public long getDocumentsDownloadedTotalBytes() { + return documentsDownloadedTotalBytes; + } + + public long getDocumentsDownloadedTotal() { + return documentsDownloadedTotal; + } + + private void download(Bson mongoQueryFilter) throws InterruptedException, TimeoutException { + failuresStartTimestamp = null; // When the last series of failures started + numberOfFailures = 0; + long retryIntervalMs = retryInitialIntervalMillis; + boolean downloadCompleted = false; + Map<String, Integer> exceptions = new HashMap<>(); + while (!downloadCompleted) { + try { + if (lastIdDownloaded == null) { + // lastIdDownloaded is null only when starting the download or if there is a connection error + // before anything is downloaded + DownloadRange firstRange = new DownloadRange(0, Long.MAX_VALUE, null, downloadOrder.downloadInAscendingOrder()); + downloadRange(firstRange, mongoQueryFilter, downloadOrder); + } else { + LOG.info("Recovering from broken connection, finishing downloading documents with _modified={}", nextLastModified); + DownloadRange partialLastModifiedRange = new DownloadRange(nextLastModified, nextLastModified, lastIdDownloaded, downloadOrder.downloadInAscendingOrder()); + downloadRange(partialLastModifiedRange, mongoQueryFilter, downloadOrder); + // Downloaded everything from _nextLastModified. Continue with the next timestamp for _modified + DownloadRange nextRange = downloadOrder.downloadInAscendingOrder() ? + new DownloadRange(nextLastModified + 1, Long.MAX_VALUE, null, true) : + new DownloadRange(0, nextLastModified - 1, null, false); + downloadRange(nextRange, mongoQueryFilter, downloadOrder); } - this.lastIdDownloaded = id; - this.documentsDownloadedTotal++; - reportProgress(id); - - batch[nextIndex] = next; - nextIndex++; - int docSize = (int) next.remove(NodeDocumentCodec.SIZE_FIELD); - batchSize += docSize; - documentsDownloadedTotalBytes += docSize; - if (batchSize >= maxBatchSizeBytes || nextIndex == batch.length) { - LOG.trace("Enqueuing block with {} elements, estimated size: {} bytes", nextIndex, batchSize); - tryEnqueueCopy(batch, nextIndex); - nextIndex = 0; - batchSize = 0; + downloadCompleted = true; + } catch (MongoException e) { + if (e instanceof MongoInterruptedException || e instanceof MongoIncompatibleDriverException) { + // Non-recoverable exceptions + throw e; + } + if (failuresStartTimestamp == null) { + failuresStartTimestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS); + } + LOG.warn("Connection error downloading from MongoDB.", e); + long secondsSinceStartOfFailures = Duration.between(failuresStartTimestamp, Instant.now()).toSeconds(); + if (parallelDump && mongoServerSelector.atLeastOneConnectionActive()) { + // Special case, the cluster is up because one of the connections is active. This happens when + // there is a single secondary, maybe because of a scale-up/down. In this case, do not abort the + // download, keep retrying to connect forever + int retryTime = 1000; + LOG.info("At least one connection is active. Retrying download in {} ms", retryTime); + Thread.sleep(retryTime); + } else if (secondsSinceStartOfFailures > retryDuringSeconds) { + // Give up. Get a string of all exceptions that were thrown + StringBuilder summary = new StringBuilder(); + for (Map.Entry<String, Integer> entry : exceptions.entrySet()) { + summary.append("\n\t").append(entry.getValue()).append("x: ").append(entry.getKey()); + } + throw new RetryException(retryDuringSeconds, summary.toString(), e); + } else { + numberOfFailures++; + LOG.warn("Retrying download in {} ms; number of times failed: {}; current series of failures started at: {} ({} seconds ago)", + retryIntervalMs, numberOfFailures, failuresStartTimestamp, secondsSinceStartOfFailures); + exceptions.compute(e.getClass().getSimpleName() + " - " + e.getMessage(), + (key, val) -> val == null ? 1 : val + 1 + ); + Thread.sleep(retryIntervalMs); + // simple exponential backoff mechanism + retryIntervalMs = Math.min(retryMaxIntervalMillis, retryIntervalMs * 2); } } - if (nextIndex > 0) { - LOG.info("Enqueueing last block with {} elements, estimated size: {}", - nextIndex, IOUtils.humanReadableByteCountBin(batchSize)); - tryEnqueueCopy(batch, nextIndex); + } + } + + private void downloadRange(DownloadRange range, Bson filter, DownloadOrder downloadOrder) throws InterruptedException, TimeoutException { + Bson findQuery = range.getFindQuery(); + if (filter != null) { + findQuery = Filters.and(findQuery, filter); + } + Bson sortOrder = downloadOrder.downloadInAscendingOrder() ? + Sorts.ascending(NodeDocument.MODIFIED_IN_SECS, NodeDocument.ID) : + Sorts.descending(NodeDocument.MODIFIED_IN_SECS, NodeDocument.ID); + + FindIterable<NodeDocument> mongoIterable = dbCollection + .find(findQuery) + .sort(sortOrder); + + LOG.info("Traversing: {}. Query: {}, Traverse order: {}", range, findQuery, sortOrder); + download(mongoIterable); + } + + void download(FindIterable<NodeDocument> mongoIterable) throws InterruptedException, TimeoutException { + try (MongoCursor<NodeDocument> cursor = mongoIterable.iterator()) { + NodeDocument[] batch = new NodeDocument[maxBatchNumberOfDocuments]; + int nextIndex = 0; + int batchSize = 0; + if (cursor.hasNext()) { + // We have managed to reconnect, reset the failure timestamp + failuresStartTimestamp = null; + numberOfFailures = 0; } - } catch (MongoException e) { - if (e instanceof MongoInterruptedException || e instanceof MongoIncompatibleDriverException) { - // Non-recoverable exceptions + try { + while (cursor.hasNext()) { + NodeDocument next = cursor.next(); + String id = next.getId(); + this.nextLastModified = next.getModified(); Review Comment: My editor warns me that this might produce a `NullPointerException`. And we use this a few other places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org