thomasmueller commented on a change in pull request #508:
URL: https://github.com/apache/jackrabbit-oak/pull/508#discussion_r825693644
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -304,14 +364,18 @@ void createInitialTasks(NodeStateEntryTraverserFactory
nodeStateEntryTraverserFa
void addTask(TraversingRange range, NodeStateEntryTraverserFactory
nodeStateEntryTraverserFactory, BlobStore blobStore,
ConcurrentLinkedQueue<String> completedTasks) throws
IOException {
taskQueue.add(new TraverseAndSortTask(range, comparator, blobStore,
storeDir,
- compressionEnabled, completedTasks, taskQueue, phaser,
nodeStateEntryTraverserFactory, memoryManager, dumpThreshold));
+ compressionEnabled, completedTasks, taskQueue, phaser,
nodeStateEntryTraverserFactory, memoryManager, dumpThreshold, sortedFiles));
}
@Override
public File createSortedStoreFile() throws IOException, CompositeException
{
String watcherThreadName = "watcher";
+ String mergerThreadName = "merger";
Thread watcher = new Thread(new TaskRunner(), watcherThreadName);
watcher.start();
+ File sortedFile = new File(storeDir,
getSortedStoreFileName(compressionEnabled));
+ Thread merger = new Thread(new MergeRunner(sortedFile),
mergerThreadName);
Review comment:
Could you set setDaemon(true) please? So that the JVM will stop if this
is the only living thread. I see you have already set the thread name, that's
good.
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java
##########
@@ -59,12 +59,30 @@
/**
* Default value for {@link #PROP_THREAD_POOL_SIZE}
*/
- static final String DEFAULT_NUMBER_OF_DATA_DUMP_THREADS = "4";
+ static final int DEFAULT_NUMBER_OF_DATA_DUMP_THREADS = 4;
/**
* System property for specifying number of threads for parallel download
when using {@link MultithreadedTraverseWithSortStrategy}
*/
static final String PROP_THREAD_POOL_SIZE =
"oak.indexer.dataDumpThreadPoolSize";
+ /**
+ * Default value for {@link #PROP_MERGE_THREAD_POOL_SIZE}
+ */
+ static final int DEFAULT_NUMBER_OF_MERGE_TASK_THREADS = 4;
Review comment:
Sorry... I thought we only want _one_ thread that concurrently merges.
Now we have 4... that would mean we can have 4 times 65 = 260 files open at the
same time... I think 1 merge thread is more efficient.
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MultithreadedTraverseWithSortStrategy.java
##########
@@ -408,6 +454,174 @@ public void run() {
}
}
+ private boolean merge(List<File> files, File outputFile) {
+ try (BufferedWriter writer = createWriter(outputFile,
compressionEnabled)) {
+ Function<String, NodeStateHolder> func1 = (line) -> line == null ?
null : new SimpleNodeStateHolder(line);
+ Function<NodeStateHolder, String> func2 = holder -> holder == null
? null : holder.getLine();
+ ExternalSort.mergeSortedFiles(files,
+ writer,
+ comparator,
+ charset,
+ true, //distinct
+ compressionEnabled, //useZip
+ func2,
+ func1
+ );
+ } catch (IOException e) {
+ log.error("Merge failed with IOException", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Class responsible for -
+ * <ol>
+ * <li>Watching {@link #taskQueue} for new tasks</li>
+ * <li>Submitting those tasks to an {@link ExecutorService}</li>
+ * <li>Collecting the results (sorted files) created by those tasks
into one place</li>
+ * </ol>
+ */
+ private class MergeTask implements Callable<File> {
+ private final Phaser mergeTaskPhaser;
+ private final List<File> mergeTarget;
+ private final File mergedFile;
+
+ MergeTask(List<File> mergeTarget, Phaser mergeTaskPhaser, File
mergedFile) {
+ this.mergeTarget = mergeTarget;
+ this.mergeTaskPhaser = mergeTaskPhaser;
+ this.mergedFile = mergedFile;
+ mergeTaskPhaser.register();
+ }
+
+ @Override
+ public File call() {
+ log.info("performing merge for {} with size {}",
mergedFile.getName(), mergeTarget.size());
+ try {
+ if (merge(mergeTarget, mergedFile)) {
+ log.info("merge complete for {}", mergedFile.getName());
+ return mergedFile;
+ }
+ log.error("merge failed for {}", mergedFile.getName());
+ } finally {
+ mergeTaskPhaser.arriveAndDeregister();
+ }
+
+ return mergedFile;
+ }
+ }
+
+ /**
+ * Class responsible for -
+ * <ol>
+ * <li>Watching {@link #sortedFiles} for new sorted files</li>
+ * <li>Submitting those files in batch to an {@link
ExecutorService}</li>
+ * <li>Collecting the results (sorted files) created by those
tasks</li>
+ * <li>Merge the result with any left over files to create a single
sorted file</li>
+ * </ol>
+ * Strategy -
+ * <ol>
+ * <li>Wait for n files</li>
+ * <li>construct new list of files to be merged by checking if its
already merged</li>
+ * and create intermediate merge file
+ * (if final merge) merge all intermediate merge files and create
sorted file
+ * <li>add all merged files to merged list</li>
+ * </ol>
+ */
+ private class MergeRunner implements Runnable {
+ private final ArrayList<File> mergedFiles = new ArrayList<File>();
+ private final ArrayList<File> unmergedFiles = new ArrayList<File>();
+ private final File sortedFile;
+ private final ExecutorService executorService;
+ private final int threadPoolSize =
Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE,
DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
+ private final int batchMergeSize =
Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE,
DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
+ private final Comparator fileSizeComparator = new SizeFileComparator();
+
+ public MergeRunner(File sortedFile) {
+ this.sortedFile = sortedFile;
+ this.executorService =
Executors.newFixedThreadPool(threadPoolSize);
+ }
+
+ private List<File> getSmallestUnmergedFiles(int size) {
+ ArrayList<File> result = new ArrayList<File>(unmergedFiles);
+ result.remove(MERGE_POISON_PILL);
+ result.sort(fileSizeComparator);
+ int endIdx = size > result.size() ? result.size() : size;
+ return result.subList(0, endIdx);
+ }
+
+ private void markAsMerged(List<File> target) {
+ mergedFiles.addAll(target);
+ unmergedFiles.removeAll(target);
+ }
+
+ @Override
+ public void run() {
+ Phaser mergeTaskPhaser = new Phaser(1);
+ List<Future<File>> results = new ArrayList<>();
+ List<File> mergeTarget = new ArrayList<>();
+ int count = 0;
+
+ while (true) {
+ try {
+ unmergedFiles.add(sortedFiles.take());
+ if (sortedFiles.contains(MERGE_POISON_PILL) ||
unmergedFiles.contains(MERGE_POISON_PILL)) {
+ break;
+ }
+ // add another batchMergeSize so that we choose the
smallest of a larger range
+ if (unmergedFiles.size() >= 2*batchMergeSize) {
+ count++;
+ mergeTarget.clear();
+ mergeTarget = getSmallestUnmergedFiles(batchMergeSize);
+ Callable<File> mergeTask = new MergeTask(mergeTarget,
mergeTaskPhaser,
+ new File(mergeDir,
String.format("intermediate-%s", count)));
+ markAsMerged(mergeTarget);
+ results.add(executorService.submit(mergeTask));
+ }
+ } catch (InterruptedException e) {
+ log.error("Failed while draining from sortedFiles {}", e);
Review comment:
Actually, here the right code would be:
log.error("Failed while draining from sortedFiles", e);
... this will log the stack trace.
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/TraverseAndSortTaskTest.java
##########
@@ -34,8 +34,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Phaser;
+import java.util.concurrent.*;
Review comment:
Please don't use star imports.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]