thomasmueller commented on a change in pull request #508:
URL: https://github.com/apache/jackrabbit-oak/pull/508#discussion_r828212763



##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +454,174 @@ public void run() {
         }
     }
 
+    private boolean merge(List<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, 
compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? 
null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null 
? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks 
into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final List<File> mergeTarget;
+        private final File mergedFile;
+
+        MergeTask(List<File> mergeTarget, Phaser mergeTaskPhaser, File 
mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", 
mergedFile.getName(), mergeTarget.size());
+            try {
+                if (merge(mergeTarget, mergedFile)) {
+                    log.info("merge complete for {}", mergedFile.getName());
+                    return mergedFile;
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link 
ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those 
tasks</li>
+     *     <li>Merge the result with any left over files to create a single 
sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files</li>
+     *      <li>construct new list of files to be merged by checking if its 
already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create 
sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final ArrayList<File> unmergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = 
Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE, 
DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
+        private final int batchMergeSize = 
Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE, 
DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
+        private final Comparator fileSizeComparator = new SizeFileComparator();
+
+        public MergeRunner(File sortedFile) {
+            this.sortedFile = sortedFile;
+            this.executorService = 
Executors.newFixedThreadPool(threadPoolSize);
+        }
+
+        private List<File> getSmallestUnmergedFiles(int size) {
+            ArrayList<File> result = new ArrayList<File>(unmergedFiles);
+            result.remove(MERGE_POISON_PILL);
+            result.sort(fileSizeComparator);
+            int endIdx = size > result.size() ? result.size() : size;
+            return result.subList(0, endIdx);
+        }
+
+        private void markAsMerged(List<File> target) {
+            mergedFiles.addAll(target);
+            unmergedFiles.removeAll(target);
+        }
+
+        @Override
+        public void run() {
+            Phaser mergeTaskPhaser = new Phaser(1);
+            List<Future<File>> results = new ArrayList<>();
+            List<File> mergeTarget = new ArrayList<>();
+            int count = 0;
+
+            while (true) {
+                try {
+                    unmergedFiles.add(sortedFiles.take());
+                    if (sortedFiles.contains(MERGE_POISON_PILL) || 
unmergedFiles.contains(MERGE_POISON_PILL)) {

Review comment:
       > Otherwise if there are 2000 files left to be merged, it would create 
more tasks and wait for those to finish
   
   I think, actually it would be better to merge at most 64 files at a time 
(always the smallest 64). I think it's better than to merge 2000 files at the 
same time: that would result in a lower cache hit rate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to