Ewocker commented on a change in pull request #508:
URL: https://github.com/apache/jackrabbit-oak/pull/508#discussion_r823178119
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +402,172 @@ public void run() {
}
}
+ private boolean merge(ArrayList<File> files, File outputFile) {
+ try (BufferedWriter writer = createWriter(outputFile,
compressionEnabled)) {
+ Function<String, NodeStateHolder> func1 = (line) -> line == null ?
null : new SimpleNodeStateHolder(line);
+ Function<NodeStateHolder, String> func2 = holder -> holder == null
? null : holder.getLine();
+ ExternalSort.mergeSortedFiles(files,
+ writer,
+ comparator,
+ charset,
+ true, //distinct
+ compressionEnabled, //useZip
+ func2,
+ func1
+ );
+ } catch (IOException e) {
+ log.error("Merge failed with IOException", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Class responsible for -
+ * <ol>
+ * <li>Watching {@link #taskQueue} for new tasks</li>
+ * <li>Submitting those tasks to an {@link ExecutorService}</li>
+ * <li>Collecting the results (sorted files) created by those tasks
into one place</li>
+ * </ol>
+ */
+ private class MergeTask implements Callable<File> {
+ private final Phaser mergeTaskPhaser;
+ private final ArrayList<File> mergeTarget;
+ private final File mergedFile;
+ private final int failureThreshold = 5;
+
+ MergeTask(ArrayList<File> mergeTarget, Phaser mergeTaskPhaser, File
mergedFile) {
+ this.mergeTarget = mergeTarget;
+ this.mergeTaskPhaser = mergeTaskPhaser;
+ this.mergedFile = mergedFile;
+ mergeTaskPhaser.register();
+ }
+
+ @Override
+ public File call() {
+ log.info("performing merge for {} with size {}",
mergedFile.getName(), mergeTarget.size());
+ try {
+ for (int mergeFailureCount = 0; mergeFailureCount <=
failureThreshold; mergeFailureCount++) {
+ if (merge(mergeTarget, mergedFile)) {
+ log.info("merge complete for {}",
mergedFile.getName());
+ return mergedFile;
+ }
+ }
+ log.error("merge failed for {}", mergedFile.getName());
+ } finally {
+ mergeTaskPhaser.arriveAndDeregister();
+ }
+
+ return mergedFile;
+ }
+ }
+
+ /**
+ * Class responsible for -
+ * <ol>
+ * <li>Watching {@link #sortedFiles} for new sorted files</li>
+ * <li>Submitting those files in batch to an {@link
ExecutorService}</li>
+ * <li>Collecting the results (sorted files) created by those
tasks</li>
+ * <li>Merge the result with any left over files to create a single
sorted file</li>
+ * </ol>
+ * Strategy -
+ * <ol>
+ * <li>Wait for n files (compare with merged list)</li>
+ * <li>construct new list of files to be merged by checking if its
already merged</li>
+ * and create intermediate merge file
+ * (if final merge) merge all intermediate merge files and create
sorted file
+ * <li>add all merged files to merged list</li>
+ * </ol>
+ */
+ private class MergeRunner implements Runnable {
+ private final ArrayList<File> mergedFiles = new ArrayList<File>();
+ private final File sortedFile;
+ private int nextMergedLength = 0;
+ private final ExecutorService executorService;
+ private final int threadPoolSize =
Integer.parseInt(System.getProperty(PROP_MERGE_THREAD_POOL_SIZE,
DEFAULT_NUMBER_OF_MERGE_TASK_THREADS));
+ private final int batchMergeSize =
Integer.parseInt(System.getProperty(PROP_MERGE_TASK_BATCH_SIZE,
DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK));
+
+ public MergeRunner(File sortedFile) {
+ this.sortedFile = sortedFile;
+ this.executorService =
Executors.newFixedThreadPool(threadPoolSize);
+ }
+
+ private ArrayList<File> getUnmergedFiles(int size) {
+ ArrayList<File> unmergedFiles = new ArrayList<File>();
+ for (File f : sortedFiles) {
+ if (!mergedFiles.contains(f) && f != MERGE_POISON_PILL) {
+ unmergedFiles.add(f);
+ }
+ if (unmergedFiles.size() == size) {
+ break;
+ }
+ }
+ return unmergedFiles;
+ }
+
+ @Override
+ public void run() {
+ nextMergedLength += batchMergeSize;
+ Phaser mergeTaskPhaser = new Phaser(1);
+ List<Future<File>> results = new ArrayList<>();
+
+ while (true) {
+ while (!sortedFiles.contains(MERGE_POISON_PILL) &&
sortedFiles.size() <= nextMergedLength) {
+ // waiting for n files to be merged in a batch
+ }
+ if (sortedFiles.contains(MERGE_POISON_PILL)) {
+ break;
+ }
+
+ ArrayList<File> mergeTarget = getUnmergedFiles(batchMergeSize);
+ System.out.println(mergeTarget);
Review comment:
removed, this is debug log message I forget to remove
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]