thomasmueller commented on a change in pull request #508:
URL: https://github.com/apache/jackrabbit-oak/pull/508#discussion_r829754624
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunner.java
##########
@@ -0,0 +1,295 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.comparator.SizeFileComparator;
+import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.function.Function;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_MERGE_TASK_THREADS;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
+
+
+/**
+ * Class responsible for -
+ * <ol>
+ * <li>Watching {@link #sortedFiles} for new sorted files</li>
+ * <li>Submitting those files in batch to an {@link ExecutorService}</li>
+ * <li>Collecting the results (sorted files) created by those tasks</li>
+ * <li>Merge the result with any left over files to create a single sorted
file</li>
+ * </ol>
+ * Strategy -
+ * <ol>
+ * <li>Wait for n files</li>
+ * <li>construct new list of files to be merged by checking if its
already merged</li>
+ * and create intermediate merge file
+ * (if final merge) merge all intermediate merge files and create sorted
file
+ * <li>add all merged files to merged list</li>
+ * </ol>
+ *
+ * <h3>Merge task explanation -</h3>
+ *
+ * SORTED_FILE_QUEUE=MultithreadedTraverseWithSortStrategy#sortedFiles
+ * MERGED_FILE_LIST=MergeRunner#mergedFiles
+ * UNMERGED_FILE_LIST=MergeRunner#unmergedFiles
+ * <ol>
+ * <li>Have a BlockingQueue of sorted files (SORTED_FILE_QUEUE) that need
to be executed for merge. Each of the task has been assigned a list of
files.</li>
+ * <li>Task thread (TraverseAndSortTask) on completion adds sorted files
to this queue</li>
+ * <li>Another monitoring thread
(MultithreadedTraverseWithSortStrategy#MergeRunner) is consuming from this
SORTED_FILE_QUEUE and submitting those
+ * part of the files in batch (batch file size is configurable via java
system property {@link FlatFileNodeStoreBuilder#PROP_MERGE_TASK_BATCH_SIZE}
+ * to executor service for merge
+ * <ol>
+ * <li>The monitoring thread pulls any sorted file and add it in
SORTED_FILE_QUEUE to the UNMERGED_FILE_LIST</li>
+ * <li>When UNMERGED_FILE_LIST grows larger than two times the
batch merge size, a merge task is submitted for merge
+ * with the smaller half portion of the UNMERGED_FILE_LIST</li>
+ * <li>Files submitted for merge will be removed from
UNMERGED_FILE_LIST and added to MERGED_FILE_LIST</li>
+ * </ol>
+ * </li>
+ * <li>A poison pill is added to SORTED_FILE_QUEUE upon download
completion</li>
+ * <li>Once poison pill occurs, the monitoring thread stops submitting new
merge task and proceed to final merging
+ * <ol>
+ * <li>Final merge waits for all existing tasks finish</li>
+ * <li>All files left in UNMERGED_FILE_LIST and all previously
task results are collected to be merged</li>
+ * </ol>
+ * </li>
+ * <li>
+ * We use a phaser (Merge#mergePhaser) for coordination between main
thread and the monitoring thread. This phaser has one phase -
+ * <ol>
+ * <li>Waiting for a single final merged file to be created</li>
+ * </ol>
+ * </li>
+ * <li>
+ * We use another phaser for coordination between monitoring thread
(MergeRunner) and the merge task executor (MergeTask). This phaser has one
phase -
+ * <ol>
+ * <li>Waiting for all merge tasks complete</li>
+ * </ol>
+ * </li>
+ * </ol>
+ */
+public class MergeRunner implements Runnable {
+ private static final Logger log =
LoggerFactory.getLogger(MergeRunner.class);
+ private final Charset charset = UTF_8;
+ private final boolean compressionEnabled;
+ private final ArrayList<File> mergedFiles = Lists.newArrayList();
+ private final ArrayList<File> unmergedFiles = Lists.newArrayList();
+ private final ExecutorService executorService;
+ private final int threadPoolSize =
Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE,
DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
+ private final int batchMergeSize =
Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE,
DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
+ private final Comparator fileSizeComparator = new SizeFileComparator();
+
+ /**
+ * The end result file after merging all sorted files.
+ */
+ private final File sortedFile;
+
+ /**
+ * Directory where intermediate merged files will be created.
+ */
+ private final File mergeDir;
+
+ /**
+ * Comparator used for comparing node states for creating sorted files.
+ */
+ private final Comparator<NodeStateHolder> comparator;
+ private final BlockingQueue<File> sortedFiles;
+ private final ConcurrentLinkedQueue<Throwable> throwables;
+
+ /**
+ * Phaser used for coordination with the traverse/download and sort tasks.
Advance of this phaser indicates that a single
+ * merged and sorted file has been created.
+ */
+ private final Phaser phaser;
+
+ /**
+ * This poison pill is added to {@link #sortedFiles} to indicate that
download phase has completed.
+ */
+ public static final File MERGE_POISON_PILL = new File("");
+
+ /**
+ * Constructor.
+ * @param sortedFiles thread safe list containing files to be merged.
+ * @param comparator comparator used to help with sorting of node state
entries.
+ * @param mergeDir directory where sorted files will be created.
+ * @param compressionEnabled if true, the created files would be compressed
+ */
+ MergeRunner(File sortedFile, BlockingQueue<File> sortedFiles, File
mergeDir, Comparator comparator,
+ Phaser phaser, boolean compressionEnabled) throws IOException {
+ this.mergeDir = mergeDir;
+ FileUtils.forceMkdir(mergeDir);
+ this.compressionEnabled = compressionEnabled;
+ this.sortedFiles = sortedFiles;
+ this.sortedFile = sortedFile;
+ this.throwables = new ConcurrentLinkedQueue<>();
+ this.comparator = comparator;
+ this.phaser = phaser;
+ this.executorService = Executors.newFixedThreadPool(threadPoolSize);
Review comment:
Could you defer creating the executorService to the merge part as well?
Two reasons:
(A) If only the constructor is called, but there is some exception, then we
have a dangling executorService.
(B) That would allow having setters for threadPoolSize and batchMergeSize,
which can then simplify testing (no need to change the system property).
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunnerTest.java
##########
@@ -0,0 +1,176 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MergeRunnerTest {
+ private final LogCustomizer lc = LogCustomizer.forLogger(MergeRunner.class)
+ .filter(Level.INFO)
+ .enable(Level.INFO)
+ .create();
+ private final String newline = System.lineSeparator();
+ private final List<File> testFiles = Lists.newArrayList();
+ private String expectedSortedFileStr = "",
+ actualSortedFileStr = "";
+
+ @Before
+ public void setup(){
+ lc.starting();
+ }
+
+ @After
+ public void after() {
+ lc.finished();
+ }
+
+ @Test
+ public void test() throws IOException {
+ lc.starting();
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE, "3");
Review comment:
If you are setting the system properties in the test, then you would
need to use
try {
old = System.getProperty...
System.setProperty...
} finally {
System.setProperty old...
}
But! It's better to use a setter, and not change the system properties (see
above).
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunnerTest.java
##########
@@ -0,0 +1,176 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MergeRunnerTest {
+ private final LogCustomizer lc = LogCustomizer.forLogger(MergeRunner.class)
+ .filter(Level.INFO)
+ .enable(Level.INFO)
+ .create();
+ private final String newline = System.lineSeparator();
+ private final List<File> testFiles = Lists.newArrayList();
+ private String expectedSortedFileStr = "",
+ actualSortedFileStr = "";
+
+ @Before
+ public void setup(){
+ lc.starting();
+ }
+
+ @After
+ public void after() {
+ lc.finished();
+ }
+
+ @Test
+ public void test() throws IOException {
+ lc.starting();
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE, "3");
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE, "1");
+
+ File tmpDir = new File(FileUtils.getTempDirectory(),
Long.toString(System.nanoTime())),
+ mergeDir = new File(tmpDir, "merge"),
+ sortedFile = new File(tmpDir, "sorted-file.json");
+ List<String> expectedLogOutput = Lists.newArrayList(),
+ actualLogOutput = Lists.newArrayList();
+
+ generateTestFiles(tmpDir);
+ assertEquals("expected 13 generated test files", 13, testFiles.size());
+
+ PathElementComparator pathComparator = new PathElementComparator();
+ Comparator<NodeStateHolder> comparator = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
+ BlockingQueue<File> sortedFiles = new LinkedBlockingQueue<>();
+ Phaser mergePhaser = new Phaser(1);
+ Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles,
mergeDir, comparator, mergePhaser, false);
+ Thread merger = new Thread(mergeRunner, "test-merger");
+ merger.setDaemon(true);
+
+ // Adding test files in predefined order
+ ArrayList<Integer> filenameOrder = new ArrayList<>(Arrays.asList(7, 1,
11, 9, 8, 6, 12, 4, 5, 13, 3, 2, 10));
+ for (int filename: filenameOrder.subList(0,4)) {
+ sortedFiles.add(testFiles.get(filename-1));
+ }
+ merger.start();
+ for (int filename: filenameOrder.subList(4,13)) {
+ sortedFiles.add(testFiles.get(filename-1));
+ }
+ sortedFiles.add(MergeRunner.MERGE_POISON_PILL);
+ mergePhaser.awaitAdvance(0);
+
+ actualSortedFileStr = FileUtils.readFileToString(sortedFile, UTF_8);
+ String text = String.join(newline, lc.getLogs());
+ System.out.println(text);
+
+ assertEquals("sorted-file content should be expected",
expectedSortedFileStr, actualSortedFileStr);
+
+ actualLogOutput = lc.getLogs();
+ expectedLogOutput.add("created merge task for intermediate-1 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(0));
+ add(testFiles.get(5));
+ add(testFiles.get(6));
+ }
+ }));
+ expectedLogOutput.add("created merge task for intermediate-2 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(3));
+ add(testFiles.get(4));
+ add(testFiles.get(7));
+ }
+ }));
+ expectedLogOutput.add("created merge task for intermediate-3 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(1));
+ add(testFiles.get(2));
+ add(testFiles.get(8));
+ }
+ }));
+ expectedLogOutput.add("Waiting for batch sorting tasks completion");
+ assertEquals("intermediate merge log output should be expected",
String.join(newline, expectedLogOutput), String.join(newline,
actualLogOutput.subList(0,4)));
+
+ assertTrue("merge complete output should exist",
actualLogOutput.containsAll((new ArrayList<String>(){
+ {
+ add("merge complete for intermediate-1");
+ add("merge complete for intermediate-2");
+ add("merge complete for intermediate-3");
+ }
+ })));
+
+ expectedLogOutput.clear();
+ expectedLogOutput.add("There are still 4 sorted files not merged yet");
+ expectedLogOutput.add("running final batch merge task for final-1 with
" + (new ArrayList<File>(){
+ {
+ add(testFiles.get(9));
+ add(testFiles.get(10));
+ add(testFiles.get(11));
+ }
+ }));
+ expectedLogOutput.add("running final batch merge task for final-2 with
" + (new ArrayList<File>(){
+ {
+ add(testFiles.get(12));
+ add(new File(mergeDir, "intermediate-1"));
+ add(new File(mergeDir, "intermediate-3"));
+ }
+ }));
+ expectedLogOutput.add("running final batch merge task for " +
sortedFile.getName() + " with " + (new ArrayList<File>(){
+ {
+ add(new File(mergeDir, "intermediate-2"));
+ add(new File(mergeDir, "final-1"));
+ add(new File(mergeDir, "final-2"));
+ }
+ }));
+ expectedLogOutput.add("Total batch sorted files length is 18");
+ assertEquals("final merge log output should be expected",
String.join(newline, expectedLogOutput), String.join(newline,
actualLogOutput.subList(7, 12)));
+ }
+
+ private void generateTestFiles(File tmpDir) throws IOException {
+ int nextFileSize = 1;
+ File testFile = new File(tmpDir, Integer.toString(nextFileSize));
+ List<String> lineBuffer = Lists.newArrayList();
+ List<String> resultList = Lists.newArrayList();
+ for (int i = 0; i <= 90; i++) {
+ String line = String.format("/%05d|{}", i);
+ lineBuffer.add(line);
+ resultList.add(line);
+ if (lineBuffer.size() == nextFileSize) {
+ String text = String.join(newline, lineBuffer);
+ FileUtils.writeStringToFile(testFile, text, UTF_8);
+ testFiles.add(testFile);
+ nextFileSize += 1;
+ testFile = new File(tmpDir, Integer.toString(nextFileSize));
+ lineBuffer.clear();
+ }
+ }
+ expectedSortedFileStr = String.join(newline, resultList) + newline;
+ }
+
+// private static List<File> getResourceFolderFiles (String folder) {
+// ClassLoader loader = Thread.currentThread().getContextClassLoader();
Review comment:
Please remove unused code
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunnerTest.java
##########
@@ -0,0 +1,176 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MergeRunnerTest {
+ private final LogCustomizer lc = LogCustomizer.forLogger(MergeRunner.class)
+ .filter(Level.INFO)
+ .enable(Level.INFO)
+ .create();
+ private final String newline = System.lineSeparator();
+ private final List<File> testFiles = Lists.newArrayList();
+ private String expectedSortedFileStr = "",
+ actualSortedFileStr = "";
+
+ @Before
+ public void setup(){
+ lc.starting();
+ }
+
+ @After
+ public void after() {
+ lc.finished();
+ }
+
+ @Test
+ public void test() throws IOException {
+ lc.starting();
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE, "3");
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE, "1");
+
+ File tmpDir = new File(FileUtils.getTempDirectory(),
Long.toString(System.nanoTime())),
+ mergeDir = new File(tmpDir, "merge"),
+ sortedFile = new File(tmpDir, "sorted-file.json");
+ List<String> expectedLogOutput = Lists.newArrayList(),
+ actualLogOutput = Lists.newArrayList();
+
+ generateTestFiles(tmpDir);
+ assertEquals("expected 13 generated test files", 13, testFiles.size());
+
+ PathElementComparator pathComparator = new PathElementComparator();
+ Comparator<NodeStateHolder> comparator = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
+ BlockingQueue<File> sortedFiles = new LinkedBlockingQueue<>();
+ Phaser mergePhaser = new Phaser(1);
+ Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles,
mergeDir, comparator, mergePhaser, false);
+ Thread merger = new Thread(mergeRunner, "test-merger");
+ merger.setDaemon(true);
+
+ // Adding test files in predefined order
+ ArrayList<Integer> filenameOrder = new ArrayList<>(Arrays.asList(7, 1,
11, 9, 8, 6, 12, 4, 5, 13, 3, 2, 10));
+ for (int filename: filenameOrder.subList(0,4)) {
+ sortedFiles.add(testFiles.get(filename-1));
+ }
+ merger.start();
+ for (int filename: filenameOrder.subList(4,13)) {
+ sortedFiles.add(testFiles.get(filename-1));
+ }
+ sortedFiles.add(MergeRunner.MERGE_POISON_PILL);
+ mergePhaser.awaitAdvance(0);
+
+ actualSortedFileStr = FileUtils.readFileToString(sortedFile, UTF_8);
+ String text = String.join(newline, lc.getLogs());
+ System.out.println(text);
+
+ assertEquals("sorted-file content should be expected",
expectedSortedFileStr, actualSortedFileStr);
+
+ actualLogOutput = lc.getLogs();
+ expectedLogOutput.add("created merge task for intermediate-1 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(0));
+ add(testFiles.get(5));
+ add(testFiles.get(6));
+ }
+ }));
+ expectedLogOutput.add("created merge task for intermediate-2 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(3));
+ add(testFiles.get(4));
+ add(testFiles.get(7));
+ }
+ }));
+ expectedLogOutput.add("created merge task for intermediate-3 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(1));
+ add(testFiles.get(2));
+ add(testFiles.get(8));
+ }
+ }));
+ expectedLogOutput.add("Waiting for batch sorting tasks completion");
+ assertEquals("intermediate merge log output should be expected",
String.join(newline, expectedLogOutput), String.join(newline,
actualLogOutput.subList(0,4)));
+
+ assertTrue("merge complete output should exist",
actualLogOutput.containsAll((new ArrayList<String>(){
+ {
+ add("merge complete for intermediate-1");
+ add("merge complete for intermediate-2");
+ add("merge complete for intermediate-3");
+ }
+ })));
+
+ expectedLogOutput.clear();
+ expectedLogOutput.add("There are still 4 sorted files not merged yet");
+ expectedLogOutput.add("running final batch merge task for final-1 with
" + (new ArrayList<File>(){
+ {
+ add(testFiles.get(9));
+ add(testFiles.get(10));
+ add(testFiles.get(11));
+ }
+ }));
+ expectedLogOutput.add("running final batch merge task for final-2 with
" + (new ArrayList<File>(){
+ {
+ add(testFiles.get(12));
+ add(new File(mergeDir, "intermediate-1"));
+ add(new File(mergeDir, "intermediate-3"));
+ }
+ }));
+ expectedLogOutput.add("running final batch merge task for " +
sortedFile.getName() + " with " + (new ArrayList<File>(){
+ {
+ add(new File(mergeDir, "intermediate-2"));
+ add(new File(mergeDir, "final-1"));
+ add(new File(mergeDir, "final-2"));
+ }
+ }));
+ expectedLogOutput.add("Total batch sorted files length is 18");
+ assertEquals("final merge log output should be expected",
String.join(newline, expectedLogOutput), String.join(newline,
actualLogOutput.subList(7, 12)));
+ }
+
+ private void generateTestFiles(File tmpDir) throws IOException {
Review comment:
What about passing a parameter "fileCount"?
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunnerTest.java
##########
@@ -0,0 +1,176 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MergeRunnerTest {
+ private final LogCustomizer lc = LogCustomizer.forLogger(MergeRunner.class)
+ .filter(Level.INFO)
+ .enable(Level.INFO)
+ .create();
+ private final String newline = System.lineSeparator();
+ private final List<File> testFiles = Lists.newArrayList();
+ private String expectedSortedFileStr = "",
+ actualSortedFileStr = "";
+
+ @Before
+ public void setup(){
+ lc.starting();
+ }
+
+ @After
+ public void after() {
+ lc.finished();
+ }
+
+ @Test
+ public void test() throws IOException {
+ lc.starting();
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE, "3");
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE, "1");
+
+ File tmpDir = new File(FileUtils.getTempDirectory(),
Long.toString(System.nanoTime())),
+ mergeDir = new File(tmpDir, "merge"),
+ sortedFile = new File(tmpDir, "sorted-file.json");
+ List<String> expectedLogOutput = Lists.newArrayList(),
+ actualLogOutput = Lists.newArrayList();
+
+ generateTestFiles(tmpDir);
+ assertEquals("expected 13 generated test files", 13, testFiles.size());
+
+ PathElementComparator pathComparator = new PathElementComparator();
+ Comparator<NodeStateHolder> comparator = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
+ BlockingQueue<File> sortedFiles = new LinkedBlockingQueue<>();
+ Phaser mergePhaser = new Phaser(1);
+ Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles,
mergeDir, comparator, mergePhaser, false);
+ Thread merger = new Thread(mergeRunner, "test-merger");
+ merger.setDaemon(true);
+
+ // Adding test files in predefined order
+ ArrayList<Integer> filenameOrder = new ArrayList<>(Arrays.asList(7, 1,
11, 9, 8, 6, 12, 4, 5, 13, 3, 2, 10));
+ for (int filename: filenameOrder.subList(0,4)) {
+ sortedFiles.add(testFiles.get(filename-1));
+ }
+ merger.start();
+ for (int filename: filenameOrder.subList(4,13)) {
+ sortedFiles.add(testFiles.get(filename-1));
+ }
+ sortedFiles.add(MergeRunner.MERGE_POISON_PILL);
+ mergePhaser.awaitAdvance(0);
+
+ actualSortedFileStr = FileUtils.readFileToString(sortedFile, UTF_8);
+ String text = String.join(newline, lc.getLogs());
+ System.out.println(text);
+
+ assertEquals("sorted-file content should be expected",
expectedSortedFileStr, actualSortedFileStr);
+
+ actualLogOutput = lc.getLogs();
+ expectedLogOutput.add("created merge task for intermediate-1 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(0));
+ add(testFiles.get(5));
+ add(testFiles.get(6));
+ }
+ }));
+ expectedLogOutput.add("created merge task for intermediate-2 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(3));
+ add(testFiles.get(4));
+ add(testFiles.get(7));
+ }
+ }));
+ expectedLogOutput.add("created merge task for intermediate-3 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(1));
+ add(testFiles.get(2));
+ add(testFiles.get(8));
+ }
+ }));
+ expectedLogOutput.add("Waiting for batch sorting tasks completion");
+ assertEquals("intermediate merge log output should be expected",
String.join(newline, expectedLogOutput), String.join(newline,
actualLogOutput.subList(0,4)));
+
+ assertTrue("merge complete output should exist",
actualLogOutput.containsAll((new ArrayList<String>(){
+ {
+ add("merge complete for intermediate-1");
+ add("merge complete for intermediate-2");
+ add("merge complete for intermediate-3");
+ }
+ })));
+
+ expectedLogOutput.clear();
+ expectedLogOutput.add("There are still 4 sorted files not merged yet");
+ expectedLogOutput.add("running final batch merge task for final-1 with
" + (new ArrayList<File>(){
+ {
+ add(testFiles.get(9));
+ add(testFiles.get(10));
+ add(testFiles.get(11));
+ }
+ }));
+ expectedLogOutput.add("running final batch merge task for final-2 with
" + (new ArrayList<File>(){
+ {
+ add(testFiles.get(12));
+ add(new File(mergeDir, "intermediate-1"));
+ add(new File(mergeDir, "intermediate-3"));
+ }
+ }));
+ expectedLogOutput.add("running final batch merge task for " +
sortedFile.getName() + " with " + (new ArrayList<File>(){
+ {
+ add(new File(mergeDir, "intermediate-2"));
+ add(new File(mergeDir, "final-1"));
+ add(new File(mergeDir, "final-2"));
+ }
+ }));
+ expectedLogOutput.add("Total batch sorted files length is 18");
+ assertEquals("final merge log output should be expected",
String.join(newline, expectedLogOutput), String.join(newline,
actualLogOutput.subList(7, 12)));
+ }
+
+ private void generateTestFiles(File tmpDir) throws IOException {
+ int nextFileSize = 1;
+ File testFile = new File(tmpDir, Integer.toString(nextFileSize));
+ List<String> lineBuffer = Lists.newArrayList();
+ List<String> resultList = Lists.newArrayList();
+ for (int i = 0; i <= 90; i++) {
+ String line = String.format("/%05d|{}", i);
+ lineBuffer.add(line);
+ resultList.add(line);
+ if (lineBuffer.size() == nextFileSize) {
Review comment:
You loop 90 times, and if the buffer size matches the next file size,
you switch to the new file... but it's not easy (for me) to understand how many
files are generated that way... Wouldn't it be easier to understand to loop
over the list of files?
What about making the files smaller and smaller? That way, it would be easy
to verify they are merged in the right order (just an idea).
##########
File path:
oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunner.java
##########
@@ -0,0 +1,295 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.comparator.SizeFileComparator;
+import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.function.Function;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_MERGE_TASK_THREADS;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE;
+import static
org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
+
+
+/**
+ * Class responsible for -
+ * <ol>
+ * <li>Watching {@link #sortedFiles} for new sorted files</li>
+ * <li>Submitting those files in batch to an {@link ExecutorService}</li>
+ * <li>Collecting the results (sorted files) created by those tasks</li>
+ * <li>Merge the result with any left over files to create a single sorted
file</li>
+ * </ol>
+ * Strategy -
+ * <ol>
+ * <li>Wait for n files</li>
+ * <li>construct new list of files to be merged by checking if its
already merged</li>
+ * and create intermediate merge file
+ * (if final merge) merge all intermediate merge files and create sorted
file
+ * <li>add all merged files to merged list</li>
+ * </ol>
+ *
+ * <h3>Merge task explanation -</h3>
+ *
+ * SORTED_FILE_QUEUE=MultithreadedTraverseWithSortStrategy#sortedFiles
+ * MERGED_FILE_LIST=MergeRunner#mergedFiles
+ * UNMERGED_FILE_LIST=MergeRunner#unmergedFiles
+ * <ol>
+ * <li>Have a BlockingQueue of sorted files (SORTED_FILE_QUEUE) that need
to be executed for merge. Each of the task has been assigned a list of
files.</li>
+ * <li>Task thread (TraverseAndSortTask) on completion adds sorted files
to this queue</li>
+ * <li>Another monitoring thread
(MultithreadedTraverseWithSortStrategy#MergeRunner) is consuming from this
SORTED_FILE_QUEUE and submitting those
+ * part of the files in batch (batch file size is configurable via java
system property {@link FlatFileNodeStoreBuilder#PROP_MERGE_TASK_BATCH_SIZE}
+ * to executor service for merge
+ * <ol>
+ * <li>The monitoring thread pulls any sorted file and add it in
SORTED_FILE_QUEUE to the UNMERGED_FILE_LIST</li>
+ * <li>When UNMERGED_FILE_LIST grows larger than two times the
batch merge size, a merge task is submitted for merge
+ * with the smaller half portion of the UNMERGED_FILE_LIST</li>
+ * <li>Files submitted for merge will be removed from
UNMERGED_FILE_LIST and added to MERGED_FILE_LIST</li>
+ * </ol>
+ * </li>
+ * <li>A poison pill is added to SORTED_FILE_QUEUE upon download
completion</li>
+ * <li>Once poison pill occurs, the monitoring thread stops submitting new
merge task and proceed to final merging
+ * <ol>
+ * <li>Final merge waits for all existing tasks finish</li>
+ * <li>All files left in UNMERGED_FILE_LIST and all previously
task results are collected to be merged</li>
+ * </ol>
+ * </li>
+ * <li>
+ * We use a phaser (Merge#mergePhaser) for coordination between main
thread and the monitoring thread. This phaser has one phase -
+ * <ol>
+ * <li>Waiting for a single final merged file to be created</li>
+ * </ol>
+ * </li>
+ * <li>
+ * We use another phaser for coordination between monitoring thread
(MergeRunner) and the merge task executor (MergeTask). This phaser has one
phase -
+ * <ol>
+ * <li>Waiting for all merge tasks complete</li>
+ * </ol>
+ * </li>
+ * </ol>
+ */
+public class MergeRunner implements Runnable {
+ private static final Logger log =
LoggerFactory.getLogger(MergeRunner.class);
+ private final Charset charset = UTF_8;
+ private final boolean compressionEnabled;
+ private final ArrayList<File> mergedFiles = Lists.newArrayList();
+ private final ArrayList<File> unmergedFiles = Lists.newArrayList();
+ private final ExecutorService executorService;
+ private final int threadPoolSize =
Integer.getInteger(PROP_MERGE_THREAD_POOL_SIZE,
DEFAULT_NUMBER_OF_MERGE_TASK_THREADS);
+ private final int batchMergeSize =
Integer.getInteger(PROP_MERGE_TASK_BATCH_SIZE,
DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK);
+ private final Comparator fileSizeComparator = new SizeFileComparator();
+
+ /**
+ * The end result file after merging all sorted files.
+ */
+ private final File sortedFile;
+
+ /**
+ * Directory where intermediate merged files will be created.
+ */
+ private final File mergeDir;
+
+ /**
+ * Comparator used for comparing node states for creating sorted files.
+ */
+ private final Comparator<NodeStateHolder> comparator;
+ private final BlockingQueue<File> sortedFiles;
+ private final ConcurrentLinkedQueue<Throwable> throwables;
+
+ /**
+ * Phaser used for coordination with the traverse/download and sort tasks.
Advance of this phaser indicates that a single
+ * merged and sorted file has been created.
+ */
+ private final Phaser phaser;
+
+ /**
+ * This poison pill is added to {@link #sortedFiles} to indicate that
download phase has completed.
+ */
+ public static final File MERGE_POISON_PILL = new File("");
+
+ /**
+ * Constructor.
+ * @param sortedFiles thread safe list containing files to be merged.
+ * @param comparator comparator used to help with sorting of node state
entries.
+ * @param mergeDir directory where sorted files will be created.
+ * @param compressionEnabled if true, the created files would be compressed
+ */
+ MergeRunner(File sortedFile, BlockingQueue<File> sortedFiles, File
mergeDir, Comparator comparator,
+ Phaser phaser, boolean compressionEnabled) throws IOException {
+ this.mergeDir = mergeDir;
+ FileUtils.forceMkdir(mergeDir);
Review comment:
Could you defer creating the directory to the merge part please? I think
it's better to not do too much stuff (do any IO) in the constructor itself.
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunnerTest.java
##########
@@ -0,0 +1,176 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MergeRunnerTest {
+ private final LogCustomizer lc = LogCustomizer.forLogger(MergeRunner.class)
+ .filter(Level.INFO)
+ .enable(Level.INFO)
+ .create();
+ private final String newline = System.lineSeparator();
+ private final List<File> testFiles = Lists.newArrayList();
+ private String expectedSortedFileStr = "",
+ actualSortedFileStr = "";
+
+ @Before
+ public void setup(){
+ lc.starting();
+ }
+
+ @After
+ public void after() {
+ lc.finished();
+ }
+
+ @Test
+ public void test() throws IOException {
+ lc.starting();
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE, "3");
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE, "1");
+
+ File tmpDir = new File(FileUtils.getTempDirectory(),
Long.toString(System.nanoTime())),
+ mergeDir = new File(tmpDir, "merge"),
+ sortedFile = new File(tmpDir, "sorted-file.json");
+ List<String> expectedLogOutput = Lists.newArrayList(),
+ actualLogOutput = Lists.newArrayList();
+
+ generateTestFiles(tmpDir);
+ assertEquals("expected 13 generated test files", 13, testFiles.size());
+
+ PathElementComparator pathComparator = new PathElementComparator();
+ Comparator<NodeStateHolder> comparator = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
+ BlockingQueue<File> sortedFiles = new LinkedBlockingQueue<>();
+ Phaser mergePhaser = new Phaser(1);
+ Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles,
mergeDir, comparator, mergePhaser, false);
+ Thread merger = new Thread(mergeRunner, "test-merger");
+ merger.setDaemon(true);
+
+ // Adding test files in predefined order
+ ArrayList<Integer> filenameOrder = new ArrayList<>(Arrays.asList(7, 1,
11, 9, 8, 6, 12, 4, 5, 13, 3, 2, 10));
+ for (int filename: filenameOrder.subList(0,4)) {
+ sortedFiles.add(testFiles.get(filename-1));
+ }
+ merger.start();
+ for (int filename: filenameOrder.subList(4,13)) {
+ sortedFiles.add(testFiles.get(filename-1));
+ }
+ sortedFiles.add(MergeRunner.MERGE_POISON_PILL);
+ mergePhaser.awaitAdvance(0);
+
+ actualSortedFileStr = FileUtils.readFileToString(sortedFile, UTF_8);
+ String text = String.join(newline, lc.getLogs());
+ System.out.println(text);
+
+ assertEquals("sorted-file content should be expected",
expectedSortedFileStr, actualSortedFileStr);
+
+ actualLogOutput = lc.getLogs();
+ expectedLogOutput.add("created merge task for intermediate-1 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(0));
+ add(testFiles.get(5));
+ add(testFiles.get(6));
+ }
+ }));
+ expectedLogOutput.add("created merge task for intermediate-2 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(3));
+ add(testFiles.get(4));
+ add(testFiles.get(7));
+ }
+ }));
+ expectedLogOutput.add("created merge task for intermediate-3 with " +
(new ArrayList<File>(){
+ {
+ add(testFiles.get(1));
+ add(testFiles.get(2));
+ add(testFiles.get(8));
+ }
+ }));
+ expectedLogOutput.add("Waiting for batch sorting tasks completion");
+ assertEquals("intermediate merge log output should be expected",
String.join(newline, expectedLogOutput), String.join(newline,
actualLogOutput.subList(0,4)));
+
+ assertTrue("merge complete output should exist",
actualLogOutput.containsAll((new ArrayList<String>(){
+ {
+ add("merge complete for intermediate-1");
+ add("merge complete for intermediate-2");
+ add("merge complete for intermediate-3");
+ }
+ })));
+
+ expectedLogOutput.clear();
+ expectedLogOutput.add("There are still 4 sorted files not merged yet");
+ expectedLogOutput.add("running final batch merge task for final-1 with
" + (new ArrayList<File>(){
+ {
+ add(testFiles.get(9));
+ add(testFiles.get(10));
+ add(testFiles.get(11));
+ }
+ }));
+ expectedLogOutput.add("running final batch merge task for final-2 with
" + (new ArrayList<File>(){
+ {
+ add(testFiles.get(12));
+ add(new File(mergeDir, "intermediate-1"));
+ add(new File(mergeDir, "intermediate-3"));
+ }
+ }));
+ expectedLogOutput.add("running final batch merge task for " +
sortedFile.getName() + " with " + (new ArrayList<File>(){
+ {
+ add(new File(mergeDir, "intermediate-2"));
+ add(new File(mergeDir, "final-1"));
+ add(new File(mergeDir, "final-2"));
+ }
+ }));
+ expectedLogOutput.add("Total batch sorted files length is 18");
+ assertEquals("final merge log output should be expected",
String.join(newline, expectedLogOutput), String.join(newline,
actualLogOutput.subList(7, 12)));
+ }
+
+ private void generateTestFiles(File tmpDir) throws IOException {
+ int nextFileSize = 1;
+ File testFile = new File(tmpDir, Integer.toString(nextFileSize));
+ List<String> lineBuffer = Lists.newArrayList();
+ List<String> resultList = Lists.newArrayList();
+ for (int i = 0; i <= 90; i++) {
+ String line = String.format("/%05d|{}", i);
+ lineBuffer.add(line);
+ resultList.add(line);
+ if (lineBuffer.size() == nextFileSize) {
+ String text = String.join(newline, lineBuffer);
+ FileUtils.writeStringToFile(testFile, text, UTF_8);
+ testFiles.add(testFile);
+ nextFileSize += 1;
Review comment:
nextFileSize++?
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunnerTest.java
##########
@@ -0,0 +1,176 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MergeRunnerTest {
+ private final LogCustomizer lc = LogCustomizer.forLogger(MergeRunner.class)
+ .filter(Level.INFO)
+ .enable(Level.INFO)
+ .create();
+ private final String newline = System.lineSeparator();
+ private final List<File> testFiles = Lists.newArrayList();
+ private String expectedSortedFileStr = "",
+ actualSortedFileStr = "";
+
+ @Before
+ public void setup(){
+ lc.starting();
+ }
+
+ @After
+ public void after() {
+ lc.finished();
+ }
+
+ @Test
+ public void test() throws IOException {
+ lc.starting();
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE, "3");
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE, "1");
+
+ File tmpDir = new File(FileUtils.getTempDirectory(),
Long.toString(System.nanoTime())),
+ mergeDir = new File(tmpDir, "merge"),
+ sortedFile = new File(tmpDir, "sorted-file.json");
+ List<String> expectedLogOutput = Lists.newArrayList(),
+ actualLogOutput = Lists.newArrayList();
+
+ generateTestFiles(tmpDir);
+ assertEquals("expected 13 generated test files", 13, testFiles.size());
+
+ PathElementComparator pathComparator = new PathElementComparator();
+ Comparator<NodeStateHolder> comparator = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
+ BlockingQueue<File> sortedFiles = new LinkedBlockingQueue<>();
+ Phaser mergePhaser = new Phaser(1);
+ Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles,
mergeDir, comparator, mergePhaser, false);
+ Thread merger = new Thread(mergeRunner, "test-merger");
+ merger.setDaemon(true);
+
+ // Adding test files in predefined order
+ ArrayList<Integer> filenameOrder = new ArrayList<>(Arrays.asList(7, 1,
11, 9, 8, 6, 12, 4, 5, 13, 3, 2, 10));
+ for (int filename: filenameOrder.subList(0,4)) {
+ sortedFiles.add(testFiles.get(filename-1));
Review comment:
Why minus one?
##########
File path:
oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/MergeRunnerTest.java
##########
@@ -0,0 +1,176 @@
+package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.event.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Phaser;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MergeRunnerTest {
+ private final LogCustomizer lc = LogCustomizer.forLogger(MergeRunner.class)
+ .filter(Level.INFO)
+ .enable(Level.INFO)
+ .create();
+ private final String newline = System.lineSeparator();
+ private final List<File> testFiles = Lists.newArrayList();
+ private String expectedSortedFileStr = "",
+ actualSortedFileStr = "";
+
+ @Before
+ public void setup(){
+ lc.starting();
+ }
+
+ @After
+ public void after() {
+ lc.finished();
+ }
+
+ @Test
+ public void test() throws IOException {
+ lc.starting();
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_TASK_BATCH_SIZE, "3");
+
System.setProperty(FlatFileNodeStoreBuilder.PROP_MERGE_THREAD_POOL_SIZE, "1");
+
+ File tmpDir = new File(FileUtils.getTempDirectory(),
Long.toString(System.nanoTime())),
+ mergeDir = new File(tmpDir, "merge"),
+ sortedFile = new File(tmpDir, "sorted-file.json");
+ List<String> expectedLogOutput = Lists.newArrayList(),
+ actualLogOutput = Lists.newArrayList();
+
+ generateTestFiles(tmpDir);
+ assertEquals("expected 13 generated test files", 13, testFiles.size());
+
+ PathElementComparator pathComparator = new PathElementComparator();
+ Comparator<NodeStateHolder> comparator = (e1, e2) ->
pathComparator.compare(e1.getPathElements(), e2.getPathElements());
+ BlockingQueue<File> sortedFiles = new LinkedBlockingQueue<>();
+ Phaser mergePhaser = new Phaser(1);
+ Runnable mergeRunner = new MergeRunner(sortedFile, sortedFiles,
mergeDir, comparator, mergePhaser, false);
+ Thread merger = new Thread(mergeRunner, "test-merger");
+ merger.setDaemon(true);
+
+ // Adding test files in predefined order
+ ArrayList<Integer> filenameOrder = new ArrayList<>(Arrays.asList(7, 1,
11, 9, 8, 6, 12, 4, 5, 13, 3, 2, 10));
Review comment:
I'm afraid I don't understand this sequence... Is it random numbers? I
think it would make sense to use a formula, and not just constants in the test
case... If you really want random numbers, then you could use Random.nextInt(n)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]