This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push: new 9432a73f1 Make unit test for FileSystemStatusReporter multithreaded. 9432a73f1 is described below commit 9432a73f1a49092304c5ece2b3e3249e38670be4 Author: tballison <talli...@apache.org> AuthorDate: Tue Nov 15 15:37:44 2022 -0500 Make unit test for FileSystemStatusReporter multithreaded. --- .../reporters/fs/TestFileSystemStatusReporter.java | 120 ++++++++++++++++----- 1 file changed, 91 insertions(+), 29 deletions(-) diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java index 5a26be3f6..16296fa1c 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/test/java/org/apache/tika/pipes/reporters/fs/TestFileSystemStatusReporter.java @@ -22,9 +22,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.json.JsonMapper; @@ -32,6 +40,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.apache.tika.pipes.PipesReporter; import org.apache.tika.pipes.PipesResult; import org.apache.tika.pipes.async.AsyncStatus; import org.apache.tika.pipes.pipesiterator.PipesIterator; @@ -67,40 +76,24 @@ public class TestFileSystemStatusReporter { try { Thread.sleep(50); } catch (InterruptedException e) { - //throw new RuntimeException(e); + return; } } } }); readerThread.start(); - PipesResult.STATUS[] statuses = PipesResult.STATUS.values(); - Map<PipesResult.STATUS, Long> written = new HashMap<>(); - Random random = new Random(); - for (int i = 0; i < 1000; i++) { - PipesResult.STATUS status = statuses[random.nextInt(statuses.length)]; - PipesResult pipesResult = new PipesResult(status); - reporter.report(PipesIterator.COMPLETED_SEMAPHORE, pipesResult, 100l); - Long cnt = written.get(status); - if (cnt == null) { - written.put(status, 1l); - } else { - cnt++; - written.put(status, cnt); - } - if (i % 100 == 0) { - Thread.sleep(94); - reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)), - TotalCountResult.STATUS.NOT_COMPLETED)); - } - } + + Map<PipesResult.STATUS, Long> total = runBatch(reporter, 10, 200); + + readerThread.interrupt(); readerThread.join(1000); reporter.report(new TotalCountResult(30000, TotalCountResult.STATUS.COMPLETED)); reporter.close(); AsyncStatus asyncStatus = objectMapper.readValue(path.toFile(), AsyncStatus.class); Map<PipesResult.STATUS, Long> map = asyncStatus.getStatusCounts(); - assertEquals(written.size(), map.size()); - for (Map.Entry<PipesResult.STATUS, Long> e : written.entrySet()) { + assertEquals(total.size(), map.size()); + for (Map.Entry<PipesResult.STATUS, Long> e : total.entrySet()) { assertTrue(map.containsKey(e.getKey()), e.getKey().toString()); assertEquals(e.getValue(), map.get(e.getKey()), e.getKey().toString()); } @@ -109,11 +102,80 @@ public class TestFileSystemStatusReporter { assertEquals(TotalCountResult.STATUS.COMPLETED, asyncStatus.getTotalCountResult().getStatus()); } - /*@Test - //need to turn this into an actual test - public void oneOff() throws Exception { - Path config = Paths.get(""); - AsyncProcessor.main(new String[]{ config.toAbsolutePath().toString()}); - }*/ + private Map<PipesResult.STATUS, Long> runBatch(FileSystemStatusReporter reporter, + int numThreads, + int numIterations) + throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + ExecutorCompletionService executorCompletionService = + new ExecutorCompletionService(executorService); + List<ReportWorker> workerList = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + ReportWorker reportWorker = new ReportWorker(reporter, numIterations); + workerList.add(reportWorker); + executorCompletionService.submit(reportWorker); + } + + Map<PipesResult.STATUS, Long> total = new HashMap<>(); + int finished = 0; + while (finished < numThreads) { + Future<Integer> future = executorCompletionService.poll(); + if (future != null) { + future.get(); + finished++; + } + } + for (ReportWorker r : workerList) { + Map<PipesResult.STATUS, Long> local = r.getWritten(); + for (Map.Entry<PipesResult.STATUS, Long> e : local.entrySet()) { + Long t = total.get(e.getKey()); + if (t == null) { + t = e.getValue(); + } else { + t += e.getValue(); + } + total.put(e.getKey(), t); + } + } + return total; + } + + private class ReportWorker implements Callable<Integer> { + Map<PipesResult.STATUS, Long> written = new HashMap<>(); + private final PipesReporter reporter; + private final int numIterations; + private ReportWorker(PipesReporter reporter, int numIterations) { + this.reporter = reporter; + this.numIterations = numIterations; + } + @Override + public Integer call() throws Exception { + PipesResult.STATUS[] statuses = PipesResult.STATUS.values(); + Random random = new Random(); + for (int i = 0; i < numIterations; i++) { + PipesResult.STATUS status = statuses[random.nextInt(statuses.length)]; + PipesResult pipesResult = new PipesResult(status); + + reporter.report(PipesIterator.COMPLETED_SEMAPHORE, pipesResult, 100l); + Long cnt = written.get(status); + if (cnt == null) { + written.put(status, 1l); + } else { + cnt++; + written.put(status, cnt); + } + if (i % 100 == 0) { + Thread.sleep(94); + reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)), + TotalCountResult.STATUS.NOT_COMPLETED)); + } + } + return 1; + } + + Map<PipesResult.STATUS, Long> getWritten() { + return written; + } + } }