[
https://issues.apache.org/jira/browse/METRON-1614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546959#comment-16546959
]
ASF GitHub Bot commented on METRON-1614:
----------------------------------------
Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/metron/pull/1108#discussion_r203123994
--- Diff:
metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java
---
@@ -230,69 +274,77 @@ protected void reduce(LongWritable key,
Iterable<BytesWritable> values, Context
, fs
, filterImpl
);
- if (sync) {
- job.waitForCompletion(true);
- } else {
- job.submit();
- }
+ mrJob.submit();
+ jobState = State.RUNNING;
+ startJobStatusTimerThread(statusInterval);
return this;
}
- /**
- * Returns a lazily-read Iterable over a set of sequence files
- */
- private SequenceFileIterable readResults(Path outputPath, Configuration
config, FileSystem fs) throws IOException {
- List<Path> files = new ArrayList<>();
- for (RemoteIterator<LocatedFileStatus> it = fs.listFiles(outputPath,
false); it.hasNext(); ) {
- Path p = it.next().getPath();
- if (p.getName().equals("_SUCCESS")) {
- fs.delete(p, false);
- continue;
+ private void startJobStatusTimerThread(long interval) {
+ timer = new Timer();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ synchronized (jobState) {
+ if (jobState == State.RUNNING) {
+ if (mrJob.isComplete()) {
+ switch (mrJob.getStatus().getState()) {
+ case SUCCEEDED:
+ jobState = State.FINALIZING;
+ if (setFinalResults(finalizer, configuration)) {
+ jobState = State.SUCCEEDED;
+ } else {
+ jobState = State.FAILED;
+ }
+ break;
+ case FAILED:
+ jobState = State.FAILED;
+ break;
+ case KILLED:
+ jobState = State.KILLED;
+ break;
+ }
+ }
+ cancel(); // be gone, ye!
--- End diff --
Good catch. That's an obvious auto-bracket error. I'll see if I can also
add a test for this as well.
> Create job status abstraction
> -----------------------------
>
> Key: METRON-1614
> URL: https://issues.apache.org/jira/browse/METRON-1614
> Project: Metron
> Issue Type: Sub-task
> Reporter: Ryan Merriman
> Assignee: Michael Miklavcic
> Priority: Major
>
> It is possible to use different job engines such as MR or Spark. There should
> be an abstraction that allows us to track status independent of the
> underlying job engine. Initially we will use YARN/MR to run pcap query jobs.
> We will also need a way to persist this information.
> Pcap job submission should be asynchronous. Some kind of id should be
> returned upon successful job submission rather than blocking and waiting on
> the job to complete.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)