Github user merrimanr commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1108#discussion_r203110747
  
    --- Diff: 
metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
 ---
    @@ -230,69 +274,77 @@ protected void reduce(LongWritable key, 
Iterable<BytesWritable> values, Context
             , fs
             , filterImpl
         );
    -    if (sync) {
    -      job.waitForCompletion(true);
    -    } else {
    -      job.submit();
    -    }
    +    mrJob.submit();
    +    jobState = State.RUNNING;
    +    startJobStatusTimerThread(statusInterval);
         return this;
       }
     
    -  /**
    -   * Returns a lazily-read Iterable over a set of sequence files
    -   */
    -  private SequenceFileIterable readResults(Path outputPath, Configuration 
config, FileSystem fs) throws IOException {
    -    List<Path> files = new ArrayList<>();
    -    for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath, 
false); it.hasNext(); ) {
    -      Path p = it.next().getPath();
    -      if (p.getName().equals("_SUCCESS")) {
    -        fs.delete(p, false);
    -        continue;
    +  private void startJobStatusTimerThread(long interval) {
    +    timer = new Timer();
    +    timer.scheduleAtFixedRate(new TimerTask() {
    +      @Override
    +      public void run() {
    +        try {
    +          synchronized (jobState) {
    +            if (jobState == State.RUNNING) {
    +              if (mrJob.isComplete()) {
    +                switch (mrJob.getStatus().getState()) {
    +                  case SUCCEEDED:
    +                    jobState = State.FINALIZING;
    +                    if (setFinalResults(finalizer, configuration)) {
    +                      jobState = State.SUCCEEDED;
    +                    } else {
    +                      jobState = State.FAILED;
    +                    }
    +                    break;
    +                  case FAILED:
    +                    jobState = State.FAILED;
    +                    break;
    +                  case KILLED:
    +                    jobState = State.KILLED;
    +                    break;
    +                }
    +              }
    +              cancel(); // be gone, ye!
    --- End diff --
    
    This is incorrect.  Putting cancel() here will cancel the timer after the 
first run.  This call should be inside the previous if statement, after the 
switch statement.


---

Reply via email to