Ewocker commented on a change in pull request #508:
URL: https://github.com/apache/jackrabbit-oak/pull/508#discussion_r823217840



##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +402,172 @@ public void run() {
         }
     }
 
+    private boolean merge(ArrayList<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, 
compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? 
null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null 
? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks 
into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final ArrayList<File> mergeTarget;
+        private final File mergedFile;
+        private final int failureThreshold = 5;
+
+        MergeTask(ArrayList<File> mergeTarget, Phaser mergeTaskPhaser, File 
mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", 
mergedFile.getName(), mergeTarget.size());
+            try {
+                for (int mergeFailureCount = 0; mergeFailureCount <= 
failureThreshold; mergeFailureCount++) {
+                    if (merge(mergeTarget, mergedFile)) {
+                        log.info("merge complete for {}", 
mergedFile.getName());
+                        return mergedFile;
+                    }
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link 
ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those 
tasks</li>
+     *     <li>Merge the result with any left over files to create a single 
sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files (compare with merged list)</li>
+     *      <li>construct new list of files to be merged by checking if its 
already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create 
sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private int nextMergedLength = 0;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = 
Integer.parseInt(System.getProperty(PROP_MERGE_THREAD_POOL_SIZE, 
DEFAULT_NUMBER_OF_MERGE_TASK_THREADS));
+        private final int batchMergeSize = 
Integer.parseInt(System.getProperty(PROP_MERGE_TASK_BATCH_SIZE, 
DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK));
+
+        public MergeRunner(File sortedFile) {
+            this.sortedFile = sortedFile;
+            this.executorService = 
Executors.newFixedThreadPool(threadPoolSize);
+        }
+
+        private ArrayList<File> getUnmergedFiles(int size) {
+            ArrayList<File> unmergedFiles = new ArrayList<File>();
+            for (File f : sortedFiles) {
+                if (!mergedFiles.contains(f) && f != MERGE_POISON_PILL) {

Review comment:
       added change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to