Github user nickwallen commented on a diff in the pull request: https://github.com/apache/metron/pull/1157#discussion_r209671313 --- Diff: metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java --- @@ -99,10 +104,55 @@ protected PcapResultsWriter getResultsWriter() { LOG.warn("Unable to cleanup files in HDFS", e); } } + LOG.info("Done finalizing results"); return new PcapPages(outFiles); } - protected abstract void write(PcapResultsWriter resultsWriter, Configuration hadoopConfig, List<byte[]> data, Path outputPath) throws IOException; + /** + * Figure out how many threads to use in the thread pool. If it's a string and ends with "C", + * then strip the C and treat it as an integral multiple of the number of cores. If it's a + * string and does not end with a C, then treat it as a number in string form. + */ + private static int getNumThreads(String numThreads) { + String numThreadsStr = ((String) numThreads).trim().toUpperCase(); + if (numThreadsStr.endsWith("C")) { + Integer factor = Integer.parseInt(numThreadsStr.replace("C", "")); + return factor * Runtime.getRuntime().availableProcessors(); + } else { + return Integer.parseInt(numThreadsStr); + } + } + + protected List<Path> writeParallel(Configuration hadoopConfig, Map<Path, List<byte[]>> toWrite, + int parallelism) throws IOException { + List<Path> outFiles = Collections.synchronizedList(new ArrayList<>()); + ForkJoinPool tp = new ForkJoinPool(parallelism); + try { + tp.submit(() -> { + toWrite.entrySet().parallelStream().forEach(e -> { + try { + Path path = e.getKey(); + List<byte[]> data = e.getValue(); + if (data.size() > 0) { + write(getResultsWriter(), hadoopConfig, data, path); + outFiles.add(path); + } + } catch (IOException ioe) { + throw new RuntimeException("Failed to write results", ioe); --- End diff -- Can we add the path that failed to write to the exception message?
---