Ewocker commented on a change in pull request #508:
URL: https://github.com/apache/jackrabbit-oak/pull/508#discussion_r830374789



##########
File path: 
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunner.java
##########
@@ -0,0 +1,295 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.comparator.SizeFileComparator;
+import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.function.Function;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_MERGE_TASK_THREADS;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE;
+import static 
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
+
+
+/**
+ * Class responsible for -
+ * <ol>
+ *     <li>Watching {@link #sortedFiles} for new sorted files</li>
+ *     <li>Submitting those files in batch to an {@link ExecutorService}</li>
+ *     <li>Collecting the results (sorted files) created by those tasks</li>
+ *     <li>Merge the result with any left over files to create a single sorted 
file</li>
+ * </ol>
+ * Strategy -
+ * <ol>
+ *      <li>Wait for n files</li>
+ *      <li>construct new list of files to be merged by checking if its 
already merged</li>
+ *    and create intermediate merge file
+ *    (if final merge) merge all intermediate merge files and create sorted 
file
+ *      <li>add all merged files to merged list</li>
+ * </ol>
+ *
+ * <h3>Merge task explanation -</h3>
+ *
+ * SORTED_FILE_QUEUE=MultithreadedTraverseWithSortStrategy#sortedFiles
+ * MERGED_FILE_LIST=MergeRunner#mergedFiles
+ * UNMERGED_FILE_LIST=MergeRunner#unmergedFiles
+ * <ol>
+ *     <li>Have a BlockingQueue of sorted files (SORTED_FILE_QUEUE) that need 
to be executed for merge. Each of the task has been assigned a list of 
files.</li>
+ *     <li>Task thread (TraverseAndSortTask) on completion adds sorted files 
to this queue</li>
+ *     <li>Another monitoring thread 
(MultithreadedTraverseWithSortStrategy#MergeRunner) is consuming from this 
SORTED_FILE_QUEUE and submitting those
+ *     part of the files in batch (batch file size is configurable via java 
system property {@link FlatFileNodeStoreBuilder#PROP_MERGE_TASK_BATCH_SIZE}
+ *     to executor service for merge
+ *          <ol>
+ *              <li>The monitoring thread pulls any sorted file and add it in 
SORTED_FILE_QUEUE to the UNMERGED_FILE_LIST</li>
+ *              <li>When UNMERGED_FILE_LIST grows larger than two times the 
batch merge size, a merge task is submitted for merge
+ *              with the smaller half portion of the UNMERGED_FILE_LIST</li>
+ *              <li>Files submitted for merge will be removed from 
UNMERGED_FILE_LIST and added to MERGED_FILE_LIST</li>
+ *          </ol>
+ *     </li>
+ *     <li>A poison pill is added to SORTED_FILE_QUEUE upon download 
completion</li>
+ *     <li>Once poison pill occurs, the monitoring thread stops submitting new 
merge task and proceed to final merging
+ *          <ol>
+ *              <li>Final merge waits for all existing tasks finish</li>
+ *              <li>All files left in UNMERGED_FILE_LIST and all previously 
task results are collected to be merged</li>
+ *          </ol>
+ *     </li>
+ *     <li>
+ *         We use a phaser (Merge#mergePhaser) for coordination between main 
thread and the monitoring thread. This phaser has one phase -
+ *         <ol>
+ *             <li>Waiting for a single final merged file to be created</li>
+ *         </ol>
+ *     </li>
+ *     <li>
+ *         We use another phaser for coordination between monitoring thread 
(MergeRunner) and the merge task executor (MergeTask). This phaser has one 
phase -
+ *         <ol>
+ *             <li>Waiting for all merge tasks complete</li>
+ *         </ol>
+ *     </li>
+ * </ol>
+ */
+public class MergeRunner implements Runnable {
+    private static final Logger log = 
LoggerFactory.getLogger(MergeRunner.class);
+    private final Charset charset = UTF_8;
+    private final boolean compressionEnabled;
+    private final ArrayList<File> mergedFiles = Lists.newArrayList();
+    private final ArrayList<File> unmergedFiles = Lists.newArrayList();
+    private final ExecutorService executorService;
+    private final int threadPoolSize = 
Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE, 
DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
+    private final int batchMergeSize = 
Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE, 
DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
+    private final Comparator fileSizeComparator = new SizeFileComparator();
+
+    /**
+     * The end result file after merging all sorted files.
+     */
+    private final File sortedFile;
+
+    /**
+     * Directory where intermediate merged files will be created.
+     */
+    private final File mergeDir;
+
+    /**
+     * Comparator used for comparing node states for creating sorted files.
+     */
+    private final Comparator<NodeStateHolder> comparator;
+    private final BlockingQueue<File> sortedFiles;
+    private final ConcurrentLinkedQueue<Throwable> throwables;
+
+    /**
+     * Phaser used for coordination with the traverse/download and sort tasks. 
Advance of this phaser indicates that a single
+     * merged and sorted file has been created.
+     */
+    private final Phaser phaser;
+
+    /**
+     * This poison pill is added to {@link #sortedFiles} to indicate that 
download phase has completed.
+     */
+    public static final File MERGE_POISON_PILL = new File("");
+
+    /**
+     * Constructor.
+     * @param sortedFiles thread safe list containing files to be merged.
+     * @param comparator comparator used to help with sorting of node state 
entries.
+     * @param mergeDir directory where sorted files will be created.
+     * @param compressionEnabled if true, the created files would be compressed
+     */
+    MergeRunner(File sortedFile, BlockingQueue<File> sortedFiles, File 
mergeDir, Comparator comparator,
+                Phaser phaser, boolean compressionEnabled) throws IOException {
+        this.mergeDir = mergeDir;
+        FileUtils.forceMkdir(mergeDir);
+        this.compressionEnabled = compressionEnabled;
+        this.sortedFiles = sortedFiles;
+        this.sortedFile = sortedFile;
+        this.throwables = new ConcurrentLinkedQueue<>();
+        this.comparator = comparator;
+        this.phaser = phaser;
+        this.executorService = Executors.newFixedThreadPool(threadPoolSize);

Review comment:
       I actually make it a parameter. don't think we would ever change the 
value, at least threadpool size cannot be changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to