steffenvan commented on code in PR #1435:
URL: https://github.com/apache/jackrabbit-oak/pull/1435#discussion_r1596540743


##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java:
##########
@@ -581,94 +498,295 @@ private MongoFilterPaths getPathsForRegexFiltering() {
         }
     }
 
-    private void download(FindIterable<NodeDocument> mongoIterable) throws 
InterruptedException, TimeoutException {
-        try (MongoCursor<NodeDocument> cursor = mongoIterable.iterator()) {
-            NodeDocument[] batch = new NodeDocument[maxBatchNumberOfDocuments];
-            int nextIndex = 0;
-            int batchSize = 0;
-            try {
-                while (cursor.hasNext()) {
-                    NodeDocument next = cursor.next();
-                    String id = next.getId();
-                    // If we are retrying on connection errors, we need to 
keep track of the last _modified value
-                    if (retryOnConnectionErrors) {
-                        this.nextLastModified = next.getModified();
+    private void logWithRateLimit(Runnable f) {
+        Instant now = Instant.now();
+        if (Duration.between(lastDelayedEnqueueWarningMessageLoggedTimestamp, 
now).toSeconds() > MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES) {
+            f.run();
+            lastDelayedEnqueueWarningMessageLoggedTimestamp = now;
+        }
+    }
+
+    enum DownloadOrder {
+        ASCENDING,
+        DESCENDING,
+        UNDEFINED;
+
+        public boolean downloadInAscendingOrder() {
+            return this == ASCENDING || this == UNDEFINED;
+        }
+    }
+
+    /**
+     * Downloads a given range from Mongo. Instances of this class should be 
used for downloading a single range.
+     * To download multiple ranges, create multiple instances of this class.
+     */
+    private class DownloadTask {
+        private final DownloadOrder downloadOrder;
+        private final DownloadStageStatistics downloadStatics;
+        private long documentsDownloadedTotalBytes;
+        private long documentsDownloadedTotal;
+        private long totalEnqueueWaitTimeMillis;
+        private long nextLastModified;
+        private String lastIdDownloaded;
+
+        DownloadTask(DownloadOrder downloadOrder, DownloadStageStatistics 
downloadStatics) {
+            this.downloadOrder = downloadOrder;
+            this.downloadStatics = downloadStatics;
+            this.documentsDownloadedTotalBytes = 0;
+            this.documentsDownloadedTotal = 0;
+            this.totalEnqueueWaitTimeMillis = 0;
+            this.nextLastModified = downloadOrder.downloadInAscendingOrder() ? 
0 : Long.MAX_VALUE;
+            this.lastIdDownloaded = null;
+        }
+
+        private Instant failuresStartTimestamp = null; // When the last series 
of failures started
+        private int numberOfFailures = 0;
+
+        public long getDocumentsDownloadedTotalBytes() {
+            return documentsDownloadedTotalBytes;
+        }
+
+        public long getDocumentsDownloadedTotal() {
+            return documentsDownloadedTotal;
+        }
+
+        private void download(Bson mongoQueryFilter) throws 
InterruptedException, TimeoutException {
+            failuresStartTimestamp = null; // When the last series of failures 
started
+            numberOfFailures = 0;
+            long retryIntervalMs = retryInitialIntervalMillis;
+            boolean downloadCompleted = false;
+            Map<String, Integer> exceptions = new HashMap<>();
+            while (!downloadCompleted) {
+                try {
+                    if (lastIdDownloaded == null) {
+                        // lastIdDownloaded is null only when starting the 
download or if there is a connection error
+                        // before anything is downloaded
+                        DownloadRange firstRange = new DownloadRange(0, 
Long.MAX_VALUE, null, downloadOrder.downloadInAscendingOrder());
+                        downloadRange(firstRange, mongoQueryFilter, 
downloadOrder);
+                    } else {
+                        LOG.info("Recovering from broken connection, finishing 
downloading documents with _modified={}", nextLastModified);
+                        DownloadRange partialLastModifiedRange = new 
DownloadRange(nextLastModified, nextLastModified, lastIdDownloaded, 
downloadOrder.downloadInAscendingOrder());
+                        downloadRange(partialLastModifiedRange, 
mongoQueryFilter, downloadOrder);
+                        // Downloaded everything from _nextLastModified. 
Continue with the next timestamp for _modified
+                        DownloadRange nextRange = 
downloadOrder.downloadInAscendingOrder() ?
+                                new DownloadRange(nextLastModified + 1, 
Long.MAX_VALUE, null, true) :
+                                new DownloadRange(0, nextLastModified - 1, 
null, false);
+                        downloadRange(nextRange, mongoQueryFilter, 
downloadOrder);
                     }
-                    this.lastIdDownloaded = id;
-                    this.documentsDownloadedTotal++;
-                    reportProgress(id);
-
-                    batch[nextIndex] = next;
-                    nextIndex++;
-                    int docSize = (int) 
next.remove(NodeDocumentCodec.SIZE_FIELD);
-                    batchSize += docSize;
-                    documentsDownloadedTotalBytes += docSize;
-                    if (batchSize >= maxBatchSizeBytes || nextIndex == 
batch.length) {
-                        LOG.trace("Enqueuing block with {} elements, estimated 
size: {} bytes", nextIndex, batchSize);
-                        tryEnqueueCopy(batch, nextIndex);
-                        nextIndex = 0;
-                        batchSize = 0;
+                    downloadCompleted = true;
+                } catch (MongoException e) {
+                    if (e instanceof MongoInterruptedException || e instanceof 
MongoIncompatibleDriverException) {
+                        // Non-recoverable exceptions
+                        throw e;
+                    }
+                    if (failuresStartTimestamp == null) {
+                        failuresStartTimestamp = 
Instant.now().truncatedTo(ChronoUnit.SECONDS);
+                    }
+                    LOG.warn("Connection error downloading from MongoDB.", e);
+                    long secondsSinceStartOfFailures = 
Duration.between(failuresStartTimestamp, Instant.now()).toSeconds();
+                    if (parallelDump && 
mongoServerSelector.atLeastOneConnectionActive()) {
+                        // Special case, the cluster is up because one of the 
connections is active. This happens when
+                        // there is a single secondary, maybe because of a 
scale-up/down. In this case, do not abort the
+                        // download, keep retrying to connect forever
+                        int retryTime = 1000;
+                        LOG.info("At least one connection is active. Retrying 
download in {} ms", retryTime);
+                        Thread.sleep(retryTime);
+                    } else if (secondsSinceStartOfFailures > 
retryDuringSeconds) {
+                        // Give up. Get a string of all exceptions that were 
thrown
+                        StringBuilder summary = new StringBuilder();
+                        for (Map.Entry<String, Integer> entry : 
exceptions.entrySet()) {
+                            
summary.append("\n\t").append(entry.getValue()).append("x: 
").append(entry.getKey());
+                        }
+                        throw new RetryException(retryDuringSeconds, 
summary.toString(), e);
+                    } else {
+                        numberOfFailures++;
+                        LOG.warn("Retrying download in {} ms; number of times 
failed: {}; current series of failures started at: {} ({} seconds ago)",
+                                retryIntervalMs, numberOfFailures, 
failuresStartTimestamp, secondsSinceStartOfFailures);
+                        exceptions.compute(e.getClass().getSimpleName() + " - 
" + e.getMessage(),
+                                (key, val) -> val == null ? 1 : val + 1
+                        );
+                        Thread.sleep(retryIntervalMs);
+                        // simple exponential backoff mechanism
+                        retryIntervalMs = Math.min(retryMaxIntervalMillis, 
retryIntervalMs * 2);
                     }
                 }
-                if (nextIndex > 0) {
-                    LOG.info("Enqueueing last block with {} elements, 
estimated size: {}",
-                            nextIndex, 
IOUtils.humanReadableByteCountBin(batchSize));
-                    tryEnqueueCopy(batch, nextIndex);
+            }
+        }
+
+        private void downloadRange(DownloadRange range, Bson filter, 
DownloadOrder downloadOrder) throws InterruptedException, TimeoutException {
+            Bson findQuery = range.getFindQuery();
+            if (filter != null) {
+                findQuery = Filters.and(findQuery, filter);
+            }
+            Bson sortOrder = downloadOrder.downloadInAscendingOrder() ?
+                    Sorts.ascending(NodeDocument.MODIFIED_IN_SECS, 
NodeDocument.ID) :
+                    Sorts.descending(NodeDocument.MODIFIED_IN_SECS, 
NodeDocument.ID);
+
+            FindIterable<NodeDocument> mongoIterable = dbCollection
+                    .find(findQuery)
+                    .sort(sortOrder);
+
+            LOG.info("Traversing: {}. Query: {}, Traverse order: {}", range, 
findQuery, sortOrder);
+            download(mongoIterable);
+        }
+
+        void download(FindIterable<NodeDocument> mongoIterable) throws 
InterruptedException, TimeoutException {
+            try (MongoCursor<NodeDocument> cursor = mongoIterable.iterator()) {
+                NodeDocument[] batch = new 
NodeDocument[maxBatchNumberOfDocuments];
+                int nextIndex = 0;
+                int batchSize = 0;
+                if (cursor.hasNext()) {
+                    // We have managed to reconnect, reset the failure 
timestamp
+                    failuresStartTimestamp = null;
+                    numberOfFailures = 0;
                 }
-            } catch (MongoException e) {
-                if (e instanceof MongoInterruptedException || e instanceof 
MongoIncompatibleDriverException) {
-                    // Non-recoverable exceptions
+                try {
+                    while (cursor.hasNext()) {
+                        NodeDocument next = cursor.next();
+                        String id = next.getId();
+                        this.nextLastModified = next.getModified();

Review Comment:
   My editor warns me that this might produce a `NullPointerException`. And we 
use this a few other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to