Github user mmiklavc commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1157#discussion_r209687780
  
    --- Diff: 
metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/finalizer/PcapFinalizer.java
 ---
    @@ -99,10 +104,55 @@ protected PcapResultsWriter getResultsWriter() {
             LOG.warn("Unable to cleanup files in HDFS", e);
           }
         }
    +    LOG.info("Done finalizing results");
         return new PcapPages(outFiles);
       }
     
    -  protected abstract void write(PcapResultsWriter resultsWriter, 
Configuration hadoopConfig, List<byte[]> data, Path outputPath) throws 
IOException;
    +  /**
    +   * Figure out how many threads to use in the thread pool. If it's a 
string and ends with "C",
    +   * then strip the C and treat it as an integral multiple of the number 
of cores.  If it's a
    +   * string and does not end with a C, then treat it as a number in string 
form.
    +   */
    +  private static int getNumThreads(String numThreads) {
    +    String numThreadsStr = ((String) numThreads).trim().toUpperCase();
    +    if (numThreadsStr.endsWith("C")) {
    +      Integer factor = Integer.parseInt(numThreadsStr.replace("C", ""));
    +      return factor * Runtime.getRuntime().availableProcessors();
    +    } else {
    +      return Integer.parseInt(numThreadsStr);
    +    }
    +  }
    +
    +  protected List<Path> writeParallel(Configuration hadoopConfig, Map<Path, 
List<byte[]>> toWrite,
    +      int parallelism) throws IOException {
    +    List<Path> outFiles = Collections.synchronizedList(new ArrayList<>());
    +    ForkJoinPool tp = new ForkJoinPool(parallelism);
    +    try {
    +      tp.submit(() -> {
    +        toWrite.entrySet().parallelStream().forEach(e -> {
    --- End diff --
    
    As I understand it, submit is effectively submitting the set of tasks for 
the parallel stream to execute within this threadpool, e.g. 
https://www.baeldung.com/java-8-parallel-streams-custom-threadpool. As a side 
note, the reason for a custom threadpool at all is so that this doesn't cause 
issues with other streams since the default in Java is to use a global context 
for this sort of thing. Liveness issues may arise when using the shared global 
context.


---

Reply via email to