Ewocker commented on a change in pull request #508:
URL: https://github.com/apache/jackrabbit-oak/pull/508#discussion_r826386099



##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -304,14 +364,18 @@ void createInitialTasks(NodeStateEntryTraverserFactory 
nodeStateEntryTraverserFa
     void addTask(TraversingRange range, NodeStateEntryTraverserFactory 
nodeStateEntryTraverserFactory, BlobStore blobStore,
                          ConcurrentLinkedQueue<String> completedTasks) throws 
IOException {
         taskQueue.add(new TraverseAndSortTask(range, comparator, blobStore, 
storeDir,
-                compressionEnabled, completedTasks, taskQueue, phaser, 
nodeStateEntryTraverserFactory, memoryManager, dumpThreshold));
+                compressionEnabled, completedTasks, taskQueue, phaser, 
nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, sortedFiles));
     }
 
     @Override
     public File createSortedStoreFile() throws IOException, CompositeException 
{
         String watcherThreadName = "watcher";
+        String mergerThreadName = "merger";
         Thread watcher = new Thread(new TaskRunner(), watcherThreadName);
         watcher.start();
+        File sortedFile = new File(storeDir, 
getSortedStoreFileName(compressionEnabled));
+        Thread merger = new Thread(new MergeRunner(sortedFile), 
mergerThreadName);

Review comment:
       make sense

##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +454,174 @@ public void run() {
         }
     }
 
+    private boolean merge(List<File> files, File outputFile) {
+        try (BufferedWriter writer = createWriter(outputFile, 
compressionEnabled)) {
+            Function<String, NodeStateHolder> func1 = (line) -> line == null ? 
null : new SimpleNodeStateHolder(line);
+            Function<NodeStateHolder, String> func2 = holder -> holder == null 
? null : holder.getLine();
+            ExternalSort.mergeSortedFiles(files,
+                    writer,
+                    comparator,
+                    charset,
+                    true, //distinct
+                    compressionEnabled, //useZip
+                    func2,
+                    func1
+            );
+        } catch (IOException e) {
+            log.error("Merge failed with IOException", e);
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #taskQueue} for new tasks</li>
+     *     <li>Submitting those tasks to an {@link ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those tasks 
into one place</li>
+     * </ol>
+     */
+    private class MergeTask implements Callable<File> {
+        private final Phaser mergeTaskPhaser;
+        private final List<File> mergeTarget;
+        private final File mergedFile;
+
+        MergeTask(List<File> mergeTarget, Phaser mergeTaskPhaser, File 
mergedFile) {
+            this.mergeTarget = mergeTarget;
+            this.mergeTaskPhaser = mergeTaskPhaser;
+            this.mergedFile = mergedFile;
+            mergeTaskPhaser.register();
+        }
+
+        @Override
+        public File call() {
+            log.info("performing merge for {} with size {}", 
mergedFile.getName(), mergeTarget.size());
+            try {
+                if (merge(mergeTarget, mergedFile)) {
+                    log.info("merge complete for {}", mergedFile.getName());
+                    return mergedFile;
+                }
+                log.error("merge failed for {}", mergedFile.getName());
+            } finally {
+                mergeTaskPhaser.arriveAndDeregister();
+            }
+
+            return mergedFile;
+        }
+    }
+
+    /**
+     * Class responsible for -
+     * <ol>
+     *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+     *     <li>Submitting those files in batch to an {@link 
ExecutorService}</li>
+     *     <li>Collecting the results (sorted files) created by those 
tasks</li>
+     *     <li>Merge the result with any left over files to create a single 
sorted file</li>
+     * </ol>
+     * Strategy -
+     * <ol>
+     *      <li>Wait for n files</li>
+     *      <li>construct new list of files to be merged by checking if its 
already merged</li>
+     *    and create intermediate merge file
+     *    (if final merge) merge all intermediate merge files and create 
sorted file
+     *      <li>add all merged files to merged list</li>
+     * </ol>
+     */
+    private class MergeRunner implements Runnable {
+        private final ArrayList<File> mergedFiles = new ArrayList<File>();
+        private final ArrayList<File> unmergedFiles = new ArrayList<File>();
+        private final File sortedFile;
+        private final ExecutorService executorService;
+        private final int threadPoolSize = 
Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE, 
DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
+        private final int batchMergeSize = 
Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE, 
DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
+        private final Comparator fileSizeComparator = new SizeFileComparator();
+
+        public MergeRunner(File sortedFile) {
+            this.sortedFile = sortedFile;
+            this.executorService = 
Executors.newFixedThreadPool(threadPoolSize);
+        }
+
+        private List<File> getSmallestUnmergedFiles(int size) {
+            ArrayList<File> result = new ArrayList<File>(unmergedFiles);
+            result.remove(MERGE_POISON_PILL);
+            result.sort(fileSizeComparator);
+            int endIdx = size > result.size() ? result.size() : size;
+            return result.subList(0, endIdx);
+        }
+
+        private void markAsMerged(List<File> target) {
+            mergedFiles.addAll(target);
+            unmergedFiles.removeAll(target);
+        }
+
+        @Override
+        public void run() {
+            Phaser mergeTaskPhaser = new Phaser(1);
+            List<Future<File>> results = new ArrayList<>();
+            List<File> mergeTarget = new ArrayList<>();
+            int count = 0;
+
+            while (true) {
+                try {
+                    unmergedFiles.add(sortedFiles.take());
+                    if (sortedFiles.contains(MERGE_POISON_PILL) || 
unmergedFiles.contains(MERGE_POISON_PILL)) {
+                        break;
+                    }
+                    // add another batchMergeSize so that we choose the 
smallest of a larger range
+                    if (unmergedFiles.size() >= 2*batchMergeSize) {
+                        count++;
+                        mergeTarget.clear();
+                        mergeTarget = getSmallestUnmergedFiles(batchMergeSize);
+                        Callable<File> mergeTask = new MergeTask(mergeTarget, 
mergeTaskPhaser,
+                                new File(mergeDir, 
String.format("intermediate-%s", count)));
+                        markAsMerged(mergeTarget);
+                        results.add(executorService.submit(mergeTask));
+                    }
+                } catch (InterruptedException e) {
+                    log.error("Failed while draining from sortedFiles {}", e);

Review comment:
       make sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to