steffenvan commented on code in PR #1435:
URL: https://github.com/apache/jackrabbit-oak/pull/1435#discussion_r1596541644


##########
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/MongoParallelDownloadCoordinator.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined;
+
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Coordinates the two parallel download streams used to download from Mongo 
when parallelDump is enabled. One stream
+ * downloads in ascending order the other in descending order. This class 
keeps track of the top limit of the ascending
+ * stream and of the bottom limit of the descending stream, and determines if 
the streams have crossed. This indicates
+ * that the download completed and the two threads should stop.
+ */
+class MongoParallelDownloadCoordinator {
+
+    static class DownloadPosition implements Comparable<DownloadPosition> {
+        final long lastModified;
+        final String lastId;
+
+        public DownloadPosition(long lastModified, String lastId) {
+            this.lastModified = lastModified;
+            this.lastId = lastId;
+        }
+
+        @Override
+        public int compareTo(@NotNull DownloadPosition o) {
+            int lastModifiedComparison = Long.compare(lastModified, 
o.lastModified);
+            if (lastModifiedComparison != 0) {
+                return lastModifiedComparison;
+            } else {
+                return lastId.compareTo(o.lastId);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return "DownloadPosition{" +
+                    "lastModified=" + lastModified +
+                    ", lastId='" + lastId + '\'' +
+                    '}';
+        }
+    }
+
+    private DownloadPosition lowerRangeTop = new DownloadPosition(0, null);
+    private DownloadPosition upperRangeBottom = new 
DownloadPosition(Long.MAX_VALUE, null);
+
+    public DownloadPosition getUpperRangeBottom() {
+        return upperRangeBottom;
+    }
+
+    public DownloadPosition getLowerRangeTop() {
+        return lowerRangeTop;
+    }
+
+    /**
+     * Extends the lower range of downloaded documents with the documents in 
the given batch and returns the index of
+     * the first/lowest document in this batch that was already downloaded by 
the descending download thread.
+     * <p>
+     * That is, if this method returns i, then the documents in the range 
batch[0:i) were not yet downloaded by the
+     * descending downloader, but batch[i] and above were already downloaded. 
The following are degenerate cases:
+     * <p>
+     * If i==0 then all documents of this batch were already downloaded. That 
is, b[0] >= upperRangeBottom.
+     * If i==sizeOfBatch then none of the documents were downloaded. That is, 
b[sizeOfBatch-i] < upperRangeBottom.
+     * <p>
+     * <p>
+     * The batch must be in ascending order of (_modified, _id).
+     * <p>
+     * Updates the lower range top to b[i].
+     *
+     * @param batch The batch of documents to be added to the lower range, 
must be in ascending order
+     */
+    public synchronized int extendLowerRange(NodeDocument[] batch, int 
sizeOfBatch) {
+        // batch must be in ascending order
+        int i = sizeOfBatch - 1;
+        // Start by the highest value in the range and compare it with the 
bottom of the upper range.
+        while (i >= 0) {
+            var bi = new DownloadPosition(batch[i].getModified(), 
batch[i].getId());
+            if (bi.compareTo(upperRangeBottom) < 0) {
+                // batch[i] < upperRangeLowerLimit. Can add this element
+                this.lowerRangeTop = bi;
+                return i + 1;
+            }
+            // batch[i] >= lowerRangeTop, so it was already downloaded. Keep 
going down on this batch, trying to find
+            // an element that was not yet downloaded
+            i--;
+        }
+
+        // The whole batch block was already downloaded as part of the upper 
range
+        return 0;
+    }
+
+    /**
+     * Extends the upper range of downloaded documents with the documents in 
the given batch and returns the index of
+     * the first/highest document in this batch that was already downloaded by 
the ascending download thread.
+     *
+     * @param batch The batch of documents to be added to the upper range, 
must be in descending order
+     */
+    public synchronized int extendUpperRange(NodeDocument[] batch, int 
sizeOfBatch) {
+        // batch must be in descending order
+        int i = sizeOfBatch - 1;
+        // Find the highest value in the batch that is not yet on the upper 
range of values downloaded
+        while (i >= 0) {
+            var bi = new DownloadPosition(batch[i].getModified(), 
batch[i].getId());

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to