Github user mmiklavc commented on a diff in the pull request: https://github.com/apache/metron/pull/1157#discussion_r209687780 --- Diff: metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java --- @@ -99,10 +104,55 @@ protected PcapResultsWriter getResultsWriter() { LOG.warn("Unable to cleanup files in HDFS", e); } } + LOG.info("Done finalizing results"); return new PcapPages(outFiles); } - protected abstract void write(PcapResultsWriter resultsWriter, Configuration hadoopConfig, List<byte[]> data, Path outputPath) throws IOException; + /** + * Figure out how many threads to use in the thread pool. If it's a string and ends with "C", + * then strip the C and treat it as an integral multiple of the number of cores. If it's a + * string and does not end with a C, then treat it as a number in string form. + */ + private static int getNumThreads(String numThreads) { + String numThreadsStr = ((String) numThreads).trim().toUpperCase(); + if (numThreadsStr.endsWith("C")) { + Integer factor = Integer.parseInt(numThreadsStr.replace("C", "")); + return factor * Runtime.getRuntime().availableProcessors(); + } else { + return Integer.parseInt(numThreadsStr); + } + } + + protected List<Path> writeParallel(Configuration hadoopConfig, Map<Path, List<byte[]>> toWrite, + int parallelism) throws IOException { + List<Path> outFiles = Collections.synchronizedList(new ArrayList<>()); + ForkJoinPool tp = new ForkJoinPool(parallelism); + try { + tp.submit(() -> { + toWrite.entrySet().parallelStream().forEach(e -> { --- End diff -- As I understand it, submit is effectively submitting the set of tasks for the parallel stream to execute within this threadpool, e.g. https://www.baeldung.com/java-8-parallel-streams-custom-threadpool. As a side note, the reason for a custom threadpool at all is so that this doesn't cause issues with other streams since the default in Java is to use a global context for this sort of thing. Liveness issues may arise when using the shared global context.
---