Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/1157#discussion_r209674410
--- Diff:
metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
---
@@ -99,10 +104,55 @@ protected PcapResultsWriter getResultsWriter() {
LOG.warn("Unable to cleanup files in HDFS", e);
}
}
+ LOG.info("Done finalizing results");
return new PcapPages(outFiles);
}
- protected abstract void write(PcapResultsWriter resultsWriter,
Configuration hadoopConfig, List<byte[]> data, Path outputPath) throws
IOException;
+ /**
+ * Figure out how many threads to use in the thread pool. If it's a
string and ends with "C",
+ * then strip the C and treat it as an integral multiple of the number
of cores. If it's a
+ * string and does not end with a C, then treat it as a number in string
form.
+ */
+ private static int getNumThreads(String numThreads) {
+ String numThreadsStr = ((String) numThreads).trim().toUpperCase();
+ if (numThreadsStr.endsWith("C")) {
+ Integer factor = Integer.parseInt(numThreadsStr.replace("C", ""));
+ return factor * Runtime.getRuntime().availableProcessors();
+ } else {
+ return Integer.parseInt(numThreadsStr);
+ }
+ }
+
+ protected List<Path> writeParallel(Configuration hadoopConfig, Map<Path,
List<byte[]>> toWrite,
+ int parallelism) throws IOException {
+ List<Path> outFiles = Collections.synchronizedList(new ArrayList<>());
+ ForkJoinPool tp = new ForkJoinPool(parallelism);
+ try {
+ tp.submit(() -> {
+ toWrite.entrySet().parallelStream().forEach(e -> {
--- End diff --
Shouldn't we be calling `tp.submit` for each (path, data)?
---