METRON-1690: Add more context to PcapJob JobStatus (mmiklavc via mmiklavc) closes apache/metron#1128
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/9d4842f3 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/9d4842f3 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/9d4842f3 Branch: refs/remotes/upstream/feature/METRON-1699-create-batch-profiler Commit: 9d4842f3d6f355d449d51f3c6ac02f2c904c295d Parents: d0e8757 Author: mmiklavc <[email protected]> Authored: Wed Jul 25 08:21:32 2018 -0600 Committer: Michael Miklavcic <[email protected]> Committed: Wed Jul 25 08:21:32 2018 -0600 ---------------------------------------------------------------------- .../metron-job_state_statechart_diagram.svg | 18 +- .../metron-job_state_statechart_diagram.xml | 18 +- .../java/org/apache/metron/job/JobStatus.java | 42 ++++- .../org/apache/metron/job/JobStatusTest.java | 55 ++++++ .../job/manager/InMemoryJobManagerTest.java | 16 ++ .../org/apache/metron/pcap/PcapJobTest.java | 80 +++++--- .../PcapTopologyIntegrationTest.java | 30 +-- .../java/org/apache/metron/pcap/mr/PcapJob.java | 186 +++++++++---------- site-book/bin/generate-md.sh | 2 + 9 files changed, 294 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/metron-platform/metron-job/metron-job_state_statechart_diagram.svg ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/metron-job_state_statechart_diagram.svg b/metron-platform/metron-job/metron-job_state_statechart_diagram.svg index a99c5ad..c801eae 100644 --- a/metron-platform/metron-job/metron-job_state_statechart_diagram.svg +++ b/metron-platform/metron-job/metron-job_state_statechart_diagram.svg @@ -1,14 +1,14 @@ <!-- Licensed to the Apache Software - Foundation (ASF) under one or more contributor license agreements. See the - NOTICE file distributed with this work for additional information regarding - copyright ownership. The ASF licenses this file to You under the Apache License, - Version 2.0 (the "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> -<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="661px" height="291px" version="1.1" style="background-color: rgb(255, 255, 255);"><defs/><g transform="translate(0.5,0.5)"><ellipse cx="15" cy="30" rx="11" ry="11" fill="#000000" stroke="#ff0000" transform="rotate(90,15,30)" pointer-events="none"/><rect x="110" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(123.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="92" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 92px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-dec oration:inherit;">NOT_RUNNING</div></div></foreignObject><text x="46" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">NOT_RUNNING</text></switch></g><path d="M 440 30 L 537.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 529.88 34.5 L 538.88 30 L 529.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="320" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(350.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="58" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 60px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://ww w.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">RUNNING</div></div></foreignObject><text x="29" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">RUNNING</text></switch></g><rect x="540" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(563.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="72" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 74px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">FINALIZING</div></div></foreignObject><text x="36" y="12" fill="#0000 00" text-anchor="middle" font-size="12px" font-family="Verdana">FINALIZING</text></switch></g><path d="M 30 30 L 107.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 99.88 34.5 L 108.88 30 L 99.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 230 30 L 317.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 309.88 34.5 L 318.88 30 L 309.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="320" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(357.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">KILLED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">KILLED</text></switch></g><rect x="320" y="230" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(357.5,253.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">FAILED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">FAILED</text></switch></g><rect x="540" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(562.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="75" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 76px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">SUCCEEDED</div></div></foreignObject><text x="38" y="12" fill="#000000" text-anchor="mid dle" font-size="12px" font-family="Verdana">SUCCEEDED</text></switch></g><path d="M 380 60 L 380 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 375.5 109.88 L 380 118.88 L 384.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 600 60 L 600 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 595.5 109.88 L 600 118.88 L 604.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 540 45 L 500 45 Q 490 45 490 55 L 490 250 Q 490 260 480 260 L 442.24 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 450.12 255.5 L 441.12 260 L 450.12 264.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 320 45 L 290 45 Q 280 45 280 55 L 280 250 Q 280 260 290 260 L 317.76 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 309.88 264.5 L 318.88 260 L 309.88 255.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 540 45 L 500 45 Q 490 45 490 55 L 490 140 Q 490 150 480 150 L 442.24 150" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 450.12 145.5 L 441.12 150 L 450.12 154.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/></g></svg> +<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="656px" height="291px" version="1.1"><defs/><g transform="translate(0.5,0.5)"><ellipse cx="15" cy="150" rx="11" ry="11" fill="#000000" stroke="#ff0000" transform="rotate(90,15,150)" pointer-events="none"/><rect x="102" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(115.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="92" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 92px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">NOT_RUNNING</div></div> </foreignObject><text x="46" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">NOT_RUNNING</text></switch></g><path d="M 435 30 L 532.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 524.88 34.5 L 533.88 30 L 524.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="314.5" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(345.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="58" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 60px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inl ine-block;text-align:inherit;text-decoration:inherit;">RUNNING</div></div></foreignObject><text x="29" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">RUNNING</text></switch></g><rect x="534.5" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(558.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="72" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 74px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">FINALIZING</div></div></foreignObject><text x="36" y="12" fill="#000000" text-anchor="middle" font-size="1 2px" font-family="Verdana">FINALIZING</text></switch></g><path d="M 30 150 L 99.76 150" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 91.88 154.5 L 100.88 150 L 91.88 145.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="314.5" y="120" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(352.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">KILLED</div></div></foreig nObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">KILLED</text></switch></g><rect x="314.5" y="230" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(352.5,253.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="44" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 44px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">FAILED</div></div></foreignObject><text x="22" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">FAILED</text></switch></g><rect x="534.5" y="120" width=" 120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(556.5,143.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="75" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 76px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">SUCCEEDED</div></div></foreignObject><text x="38" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">SUCCEEDED</text></switch></g><path d="M 375 60 L 375 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 370.5 109.88 L 375 118.88 L 379.5 109.88" fill="none" stroke="#ff00 00" stroke-miterlimit="10" pointer-events="none"/><path d="M 595 60 L 595 117.76" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 590.5 109.88 L 595 118.88 L 599.5 109.88" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 535 45 L 495 45 Q 485 45 485 55 L 485 250 Q 485 260 475 260 L 437.24 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 445.12 255.5 L 436.12 260 L 445.12 264.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 315 45 L 285 45 Q 275 45 275 55 L 275 250 Q 275 260 285 260 L 312.76 260" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 304.88 264.5 L 313.88 260 L 304.88 255.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 535 45 L 495 45 Q 485 45 485 55 L 485 140 Q 485 150 475 150 L 437.24 150" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 445.12 145.5 L 436.12 150 L 445.12 154.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><rect x="102" y="0" width="120" height="60" rx="14.4" ry="14.4" fill="#ffffc0" stroke="#ff0000" pointer-events="none"/><g transform="translate(125.5,23.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="72" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: Verdana; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 73px; white-space: nowrap; word-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">SUBMITTED</div></div></foreignObject><text x="36" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Verdana">SUBMITTED</text></switch></g><path d ="M 162 120 L 162 62.24" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 166.5 70.12 L 162 61.12 L 157.5 70.12" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 222 30 L 312.76 30" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/><path d="M 304.88 34.5 L 313.88 30 L 304.88 25.5" fill="none" stroke="#ff0000" stroke-miterlimit="10" pointer-events="none"/></g></svg> http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/metron-platform/metron-job/metron-job_state_statechart_diagram.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/metron-job_state_statechart_diagram.xml b/metron-platform/metron-job/metron-job_state_statechart_diagram.xml index b9ee8aa..9c33323 100644 --- a/metron-platform/metron-job/metron-job_state_statechart_diagram.xml +++ b/metron-platform/metron-job/metron-job_state_statechart_diagram.xml @@ -1,14 +1,14 @@ <!-- Licensed to the Apache Software - Foundation (ASF) under one or more contributor license agreements. See the - NOTICE file distributed with this work for additional information regarding - copyright ownership. The ASF licenses this file to You under the Apache License, - Version 2.0 (the "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <!-- This is a draw.io diagram. You can load it from http://www.draw.io --> -<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36" version="8.9.5" editor="www.draw.io" type="google"><diagram id="58cdce13-f638-feb5-8d6f-7d28b1aa9fa0" name="Page-1">7VrbctowEP0aHtvxTdg8BkJSpintlKZN+tJRbGFrIixGiAL9+kq2fAfiOhjIFF6wdqWVtOfsauVxxxzM1rcMzoNP1EOkY2jeumNedwzD0GxN/EnJJpb0eiAW+Ax7sUjPBBP8BymhGucvsYcWhY6cUsLxvCh0aRgilxdkkDG6KnabUlKcdQ59VBFMXEiq0h/Y40EsdYCWyT8g7AfJzLqmNE/QffYZXYZqvo5hTqNfrJ7BxJbqvwigR1c5kTnsmANGKY+fZusBItK3idvicTc7tOm6GQp5rQGOY1sAOK6NtCdNs98pC78hWaJkC9FC+SZxjrAgcBCNfsBnRMh08Sg2Mpf6BYeMTzjkUj/FhAwooSwaaGrRT3bmjD6jnGY6VZrIdcjLjMbekSqXzrCrngl8QqSf+jqxFNIwmpaG/AbOMJHs+46YB0OoxIpquqHa21YHCfZDIXOFD5FQ9j3MBMkwlcIFXUoI+8pNiHG03ul7PUVURAqiM8TZRnRRAxzFARUjaXuVMc5UoiBHtkQGFcf91HCGs3hQUNeE3ajAPv787dfX+/F4NL6tMKCI0irAHE3m0JXalUgIRWZA5iqvG1aVEzI23P2cOBMWHABxvVeCvAsqkOvGFsy7bWBuViPbE1lQNSnjAfVpCMkwk+ZxRWvMH3LPjxKV90C2QrG0BwVS1Mh0+zFDoXcl07ecf47CWKLQcvaSpCbaCaoETXmKqdx2AVER5MxFO9xmqfMI Mh/xHX1AbWYwRCDHv4vzHxRmqxLal7A+bFhb2jmFNajgfTMaX92Nfl4gPxzkXeOcIO++LpO//Zyc3BH25WTjjHKyfTl6m8Bs1IDZOjzMaugXioXFNAWYpWLOLBfm8VbUqPwd7AVDhlYyFO+3YihiXbqfZkR0KofFx9Hd3fD6clC0VBuYzikPil61NrgaXeBuD26rd0q4k9dpObwn94PBcHh9gby1UvC0Ea7rFWAblRaqnFDFhV4oLbJK47EWaG+htKhzq3eOVVoAs0QpCzQrLbovGWqxtNC3vDn+Jya+Gb7tjpkmTAQ1mJgk9vapaNslBoGmVLR729PkMaj42ledCRX1zpnfqlIqFm6GNmiRi71jUbFbuXA1pWLZUO+IVLQOlRX/MyrWOaCPR8XyuWo0vPuXT3pQ5vQOKgok4SbXbS47LHYv2OyWFqxre9dV7g8MUIqEeAWHiwtwSdGtpeijFa4nj4uXqSia2ecVcffsGxZz+Bc=</diagram></mxfile> +<mxfile userAgent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36" version="8.9.8" editor="www.draw.io" type="google"><diagram>7Zpdd5owGMc/jbc9hIDiZbW286x1O7Pd2t3sZBCB00g8MVbdp1+Q8GagZSjU7tibJk9eCPn/nicv2IHD+eaGoYV3Rx1MOrrmbDrwqqPrutbTxL/Qso0s/b4ZGVzmO5EJpIap/wdLo2znrnwHL3MVOaWE+4u80aZBgG2esyHG6DpfbUZJ/qkL5GLFMLURUa0/fId7kdUytdT+CfuuFz8ZaLJkjuLK0rD0kEPXGRMcdeCQUcqj1HwzxCScvHheonbXJaXJwBgOeJUGetTgBZGVfDc5Lr6NX1Y0EPMqMgOPz4mwAZEU416E5UuOGJ9yxMPymU/IkBLKdg2htvsLK3NGn3GmZDaTJYyuAgc7aafRZIRFNp37tkwT9BuTAbKf3V2DuKeABrvH0oBfo7lPQpq+Y+agAEmzRAfoMl80OkR8NxA2W0wZFoUDx2cCGp+GxiVdhYoN5DRhxvGmdKpBIqAgH9M55mwrqsgGfStqIZnXgWR+nRIEJRVeBp7YhiSzbtJxKqtISGWLVYaKypMv97++PUwm48mNInhelLXnczxdIDssXQt/zoOAmC0nWTdUBGbiz34dgRMR/QgC67GXb/fyGYWBXiBx9wgSG6rfOiJmySxl3KMuDRAZpdasjHjj88dM+ikU4cIMc4EYyaPUZJdJy16XCAfOZRhsw+cvcBBZpDjWq0xUFDcWkeAZTyQMXzsnoHBhZkuTdDkRtFwcT35lmRkmiPsv+d4P0cxU3PLsksd1SQPoF2bOKa0WfbKr6Hs9nlzejn+eJT6exF34nhL3Dgu7Hy6A 6moAhe8XQC3FwT6Pb29HV2fnaix+trqp6asB9HJ81rdJfSFoUd/4WRmBpw/D4Wh0dda4wQWyVR8GQFGy1slEnkbk2QTkTibpQeWpkkonuLAWnEys4y+ssulX6oseE0JMY58QsK98NFTZLnuftNdV9+2uoldUutpxlLxTNbQKLqv+Ca0PA1C5E1RAq6uiBbS22OpZChC9umz1QVkga4AteCS2QOfEr00StnJXPz2zNlz91tjSFLbMmmypXSU+0gBbh17WfZgrucPYKlgTW2OrYCGDZj221OUV7mNawpbQCm0z1RZhhWX5oGFPGbScxLKxqS2gkftiJBLRKGrDbp4Dac1A2toG8CRgr8OWesM6fRjcje/vz+fHxj5rtXm9Cg68Xy3eN5/g/r7kJFJxhw/V2AEa+K5V7PFii5/jw6i5BQNGvh/Q4AbMOg5W//nXUlCwJpltYQXjE1iMw/5ZrypXxlsd1eZKZNNf6kTV0987wdFf</diagram></mxfile> http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java index 5a2f485..9af549c 100644 --- a/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java +++ b/metron-platform/metron-job/src/main/java/org/apache/metron/job/JobStatus.java @@ -23,8 +23,11 @@ package org.apache.metron.job; */ public class JobStatus { + private Throwable failureReason; + public enum State { NOT_RUNNING, + SUBMITTED, RUNNING, SUCCEEDED, FINALIZING, @@ -33,11 +36,32 @@ public class JobStatus { } private String jobId; - private State state = State.NOT_RUNNING; - private double percentComplete = 0.0; + private State state; + private double percentComplete; private String description; private long completionTime; + public JobStatus() { + jobId = ""; + state = State.NOT_RUNNING; + percentComplete = 0.0; + description = "Not started"; + completionTime = 0L; + } + + /** + * Copy constructor instead of clone. Effective for thread safety, per Goetz JCIP. + * + * @param jobStatus Existing JobStatus object to copy state from. + */ + public JobStatus(JobStatus jobStatus) { + this.jobId = jobStatus.jobId; + this.state = jobStatus.state; + this.percentComplete = jobStatus.percentComplete; + this.description = jobStatus.description; + this.completionTime = jobStatus.completionTime; + } + public JobStatus withJobId(String jobId) { this.jobId = jobId; return this; @@ -63,6 +87,11 @@ public class JobStatus { return this; } + public JobStatus withFailureException(Throwable failureReason) { + this.failureReason = failureReason; + return this; + } + public String getJobId() { return jobId; } @@ -83,4 +112,13 @@ public class JobStatus { return completionTime; } + /** + * Null if no failure reason available. + * + * @return Throwable indicating failure. + */ + public Throwable getFailureReason() { + return failureReason; + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/metron-platform/metron-job/src/test/java/org/apache/metron/job/JobStatusTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/test/java/org/apache/metron/job/JobStatusTest.java b/metron-platform/metron-job/src/test/java/org/apache/metron/job/JobStatusTest.java new file mode 100644 index 0000000..67c5c52 --- /dev/null +++ b/metron-platform/metron-job/src/test/java/org/apache/metron/job/JobStatusTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.job; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.metron.job.JobStatus.State; +import org.junit.Test; + +public class JobStatusTest { + + @Test + public void constructor_copies_from_existing_instance() { + JobStatus original = new JobStatus() + .withState(State.SUCCEEDED) + .withCompletionTime(5000) + .withJobId("abc123") + .withDescription("All done") + .withPercentComplete(100.0); + JobStatus copied = new JobStatus(original); + assertThat(copied.getState(), equalTo(State.SUCCEEDED)); + assertThat(copied.getCompletionTime(), equalTo(5000L)); + assertThat(copied.getJobId(), equalTo("abc123")); + assertThat(copied.getDescription(), equalTo("All done")); + assertThat(copied.getPercentComplete(), equalTo(100.0)); + } + + @Test + public void failure_info_provided() { + JobException e = new JobException("The job blew up."); + JobStatus original = new JobStatus() + .withState(State.FAILED) + .withDescription("Failed") + .withFailureException(e); + assertThat(original.getFailureReason(), equalTo(e)); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java b/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java index f3a3978..b1fab4e 100644 --- a/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java +++ b/metron-platform/metron-job/src/test/java/org/apache/metron/job/manager/InMemoryJobManagerTest.java @@ -65,6 +65,7 @@ public class InMemoryJobManagerTest { private String jobId1; private String jobId2; private String jobId3; + private String emptyJobId; private String basePath; @Before @@ -77,6 +78,7 @@ public class InMemoryJobManagerTest { jobId1 = "job_abc_123"; jobId2 = "job_def_456"; jobId3 = "job_ghi_789"; + emptyJobId = ""; basePath = tempDir.getRoot().getAbsolutePath(); when(job1.getJobType()).thenReturn(JobType.MAP_REDUCE); when(job2.getJobType()).thenReturn(JobType.MAP_REDUCE); @@ -135,6 +137,20 @@ public class InMemoryJobManagerTest { } @Test + public void empty_result_set_with_empty_jobId_shows_status() throws JobException { + when(job1.getStatus()).thenReturn(new JobStatus().withState(State.SUCCEEDED).withJobId(emptyJobId)); + + // user submits 1 job with empty results + jm.submit(newSupplier(job1), username1); + assertThat(jm.getJob(username1, emptyJobId), equalTo(job1)); + + // user submits another job with empty results + when(job2.getStatus()).thenReturn(new JobStatus().withState(State.SUCCEEDED).withJobId(emptyJobId)); + jm.submit(newSupplier(job2), username1); + assertThat(jm.getJob(username1, emptyJobId), equalTo(job2)); + } + + @Test public void returns_job_status() throws JobException { JobStatus expected = new JobStatus().withState(State.SUCCEEDED).withJobId(jobId1); when(job1.getStatus()).thenReturn(expected); http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java index b28c428..14963fd 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/PcapJobTest.java @@ -28,6 +28,8 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Timer; +import java.util.TimerTask; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,6 +64,7 @@ public class PcapJobTest { private JobID jobId; @Mock private Finalizer<Path> finalizer; + private TestTimer timer; private Pageable<Path> pageableResult; private FixedPcapConfig config; private Configuration hadoopConfig; @@ -77,7 +80,6 @@ public class PcapJobTest { private Map<String, String> fixedFields; private PcapJob<Map<String, String>> testJob; - @Before public void setup() throws IOException { MockitoAnnotations.initMocks(this); @@ -94,7 +96,9 @@ public class PcapJobTest { finalOutputPath = new Path("finaloutpath"); when(jobId.toString()).thenReturn(jobIdVal); when(mrStatus.getJobID()).thenReturn(jobId); + when(mrJob.getJobID()).thenReturn(jobId); pageableResult = new PcapPages(); + timer = new TestTimer(); // handles setting the file name prefix under the hood config = new FixedPcapConfig(clock -> "clockprefix"); PcapOptions.HADOOP_CONF.put(config, hadoopConfig); @@ -108,28 +112,20 @@ public class PcapJobTest { PcapOptions.FILTER_IMPL.put(config, new FixedPcapFilter.Configurator()); PcapOptions.NUM_RECORDS_PER_FILE.put(config, numRecordsPerFile); PcapOptions.FINAL_OUTPUT_PATH.put(config, finalOutputPath); - testJob = new TestJob<>(); - testJob.setStatusInterval(10); - testJob.setCompleteCheckInterval(10); - } - - @Test - public void partition_gives_value_in_range() throws Exception { - long start = 1473897600000000000L; - long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L); - Configuration conf = new Configuration(); - conf.set(PcapJob.START_TS_CONF, toUnsignedString(start)); - conf.set(PcapJob.END_TS_CONF, toUnsignedString(end)); - conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10)); - PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner(); - partitioner.setConf(conf); - Assert.assertThat("Partition not in range", - partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), - equalTo(8)); + testJob = new TestJob<>(mrJob); + testJob.setStatusInterval(1); + testJob.setCompleteCheckInterval(1); + testJob.setTimer(timer); } private class TestJob<T> extends PcapJob<T> { + private final Job mrJob; + + public TestJob(Job mrJob) { + this.mrJob = mrJob; + } + @Override public Job createJob(Optional<String> jobName, Path basePath, @@ -145,6 +141,36 @@ public class PcapJobTest { } } + private class TestTimer extends Timer { + + private TimerTask task; + + @Override + public void scheduleAtFixedRate(TimerTask task, long delay, long period) { + this.task = task; + } + + public void updateJobStatus() { + task.run(); + } + + } + + @Test + public void partition_gives_value_in_range() throws Exception { + long start = 1473897600000000000L; + long end = TimestampConverters.MILLISECONDS.toNanoseconds(1473995927455L); + Configuration conf = new Configuration(); + conf.set(PcapJob.START_TS_CONF, toUnsignedString(start)); + conf.set(PcapJob.END_TS_CONF, toUnsignedString(end)); + conf.set(PcapJob.WIDTH_CONF, "" + PcapJob.findWidth(start, end, 10)); + PcapJob.PcapPartitioner partitioner = new PcapJob.PcapPartitioner(); + partitioner.setConf(conf); + Assert.assertThat("Partition not in range", + partitioner.getPartition(new LongWritable(1473978789181189000L), new BytesWritable(), 10), + equalTo(8)); + } + @Test public void job_succeeds_synchronously() throws Exception { pageableResult = new PcapPages( @@ -154,6 +180,7 @@ public class PcapJobTest { when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); when(mrJob.getStatus()).thenReturn(mrStatus); Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); Pageable<Path> results = statusable.get(); Assert.assertThat(results.getSize(), equalTo(3)); JobStatus status = statusable.getStatus(); @@ -168,6 +195,7 @@ public class PcapJobTest { when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.FAILED); when(mrJob.getStatus()).thenReturn(mrStatus); Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); Pageable<Path> results = statusable.get(); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.FAILED)); @@ -181,6 +209,7 @@ public class PcapJobTest { when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); when(mrJob.getStatus()).thenReturn(mrStatus); Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); Pageable<Path> results = statusable.get(); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.KILLED)); @@ -190,13 +219,11 @@ public class PcapJobTest { @Test public void job_succeeds_asynchronously() throws Exception { - // not complete a few times to make sure cancel works as expected - when(mrJob.isComplete()).thenReturn(false, false, false, true); + when(mrJob.isComplete()).thenReturn(true); when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED); when(mrJob.getStatus()).thenReturn(mrStatus); Statusable<Path> statusable = testJob.submit(finalizer, config); - while (!statusable.isDone()) { - } + timer.updateJobStatus(); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.SUCCEEDED)); Assert.assertThat(status.getPercentComplete(), equalTo(100.0)); @@ -207,18 +234,20 @@ public class PcapJobTest { when(mrJob.isComplete()).thenReturn(false); when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.RUNNING); when(mrJob.getStatus()).thenReturn(mrStatus); - Statusable<Path> statusable = testJob.submit(finalizer, config); when(mrJob.mapProgress()).thenReturn(0.5f); when(mrJob.reduceProgress()).thenReturn(0f); + Statusable<Path> statusable = testJob.submit(finalizer, config); + timer.updateJobStatus(); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.RUNNING)); Assert.assertThat(status.getDescription(), equalTo("map: 50.0%, reduce: 0.0%")); Assert.assertThat(status.getPercentComplete(), equalTo(25.0)); when(mrJob.mapProgress()).thenReturn(1.0f); when(mrJob.reduceProgress()).thenReturn(0.5f); + timer.updateJobStatus(); status = statusable.getStatus(); - Assert.assertThat(status.getPercentComplete(), equalTo(75.0)); Assert.assertThat(status.getDescription(), equalTo("map: 100.0%, reduce: 50.0%")); + Assert.assertThat(status.getPercentComplete(), equalTo(75.0)); } @Test @@ -230,6 +259,7 @@ public class PcapJobTest { statusable.kill(); when(mrJob.isComplete()).thenReturn(true); when(mrStatus.getState()).thenReturn(org.apache.hadoop.mapreduce.JobStatus.State.KILLED); + timer.updateJobStatus(); JobStatus status = statusable.getStatus(); Assert.assertThat(status.getState(), equalTo(State.KILLED)); } http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 0be33d6..108fd2b 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -270,7 +270,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<Map<String, String>> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -292,7 +291,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<String> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -307,6 +305,23 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { Assert.assertEquals(results.get().getSize(), 1); } { + //ensure that none get returned since date range has no results + PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); + PcapOptions.FIELDS.put(configuration, new HashMap<>()); + PcapOptions.START_TIME_NS.put(configuration, 0); + PcapOptions.END_TIME_NS.put(configuration, 1); + PcapJob<Map<String, String>> job = new PcapJob<>(); + Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); + Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); + waitForJob(results); + + Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); + Assert.assertEquals(100.0, results.getStatus().getPercentComplete(), 0.0); + Assert.assertEquals("No results in specified date range.", + results.getStatus().getDescription()); + Assert.assertEquals(results.get().getSize(), 0); + } + { //ensure that none get returned since that destination IP address isn't in the dataset PcapOptions.FILTER_IMPL.put(configuration, new FixedPcapFilter.Configurator()); PcapOptions.FIELDS.put(configuration, new HashMap<String, String>() {{ @@ -315,7 +330,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<Map<String, String>> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -337,7 +351,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<String> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -360,7 +373,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<Map<String, String>> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -382,7 +394,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<String> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -405,7 +416,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<Map<String, String>> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -429,7 +439,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<String> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -452,7 +461,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<Map<String, String>> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -487,7 +495,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<String> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -520,7 +527,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<String> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -553,7 +559,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<String> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); @@ -588,7 +593,6 @@ public class PcapTopologyIntegrationTest extends BaseIntegrationTest { PcapJob<String> job = new PcapJob<>(); Statusable<Path> results = job.submit(PcapFinalizerStrategies.CLI, configuration); Assert.assertEquals(Statusable.JobType.MAP_REDUCE, results.getJobType()); - Assert.assertEquals(JobStatus.State.RUNNING, results.getStatus().getState()); waitForJob(results); Assert.assertEquals(JobStatus.State.SUCCEEDED, results.getStatus().getState()); http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java index 1dd670d..a26e5ff 100644 --- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java +++ b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/mr/PcapJob.java @@ -79,8 +79,8 @@ public class PcapJob<T> implements Statusable<Path> { public static final String WIDTH_CONF = "width"; private static final long THREE_SECONDS = 3000; private static final long ONE_SECOND = 1000; - private Job mrJob; // store a running MR job reference for async status check - private State jobState; // overall job state, including finalization step + private volatile Job mrJob; // store a running MR job reference for async status check + private volatile JobStatus jobStatus; // overall job status, including finalization step private Finalizer<Path> finalizer; private Map<String, Object> configuration; private Pageable<Path> finalResults; @@ -181,8 +181,9 @@ public class PcapJob<T> implements Statusable<Path> { } public PcapJob() { - jobState = State.NOT_RUNNING; + jobStatus = new JobStatus(); finalResults = new PcapPages(); + timer = new Timer(); statusInterval = THREE_SECONDS; completeCheckInterval = ONE_SECOND; } @@ -286,73 +287,112 @@ public class PcapJob<T> implements Statusable<Path> { , fs , filterImpl ); + if (mrJob == null) { + LOG.info("No files to process with specified date range."); + try { + setFinalResults(input -> new PcapPages(), configuration); + jobStatus.withState(State.SUCCEEDED).withDescription("No results in specified date range.") + .withPercentComplete(100.0); + } catch (JobException e) { + // This should not cause an error as we simply set results to an empty result set. + jobStatus.withState(State.FAILED).withDescription("Unable to finalize empty job.") + .withFailureException(e); + } + return this; + } mrJob.submit(); - jobState = State.RUNNING; + jobStatus.withState(State.SUBMITTED).withDescription("Job submitted").withJobId(mrJob.getJobID().toString()); startJobStatusTimerThread(statusInterval); return this; } private void startJobStatusTimerThread(long interval) { - timer = new Timer(); - timer.scheduleAtFixedRate(new TimerTask() { + getTimer().scheduleAtFixedRate(new TimerTask() { @Override public void run() { - try { - synchronized (this) { - if (jobState == State.RUNNING) { - if (mrJob.isComplete()) { - switch (mrJob.getStatus().getState()) { - case SUCCEEDED: - jobState = State.FINALIZING; - if (setFinalResults(finalizer, configuration)) { - jobState = State.SUCCEEDED; - } else { - jobState = State.FAILED; - } - break; - case FAILED: - jobState = State.FAILED; - break; - case KILLED: - jobState = State.KILLED; - break; - } - cancel(); // be gone, ye! - } - } - } - } catch (InterruptedException | IOException e) { - jobState = State.FAILED; - cancel(); + if (!updateStatus()) { + cancel(); // be gone, ye! } } }, interval, interval); } + public void setTimer(Timer timer) { + this.timer = timer; + } + + private Timer getTimer() { + return timer; + } + + /** + * Update job status info. Will finalize job when underlying MR job completes. + * + * @return true if should continue updating status, false otherwise. + */ + private synchronized boolean updateStatus() { + try { + org.apache.hadoop.mapreduce.JobStatus mrJobStatus = mrJob.getStatus(); + org.apache.hadoop.mapreduce.JobStatus.State mrJobState = mrJob.getStatus().getState(); + if (mrJob.isComplete()) { + jobStatus.withPercentComplete(100.0); + switch (mrJobState) { + case SUCCEEDED: + jobStatus.withState(State.FINALIZING).withDescription("Finalizing job."); + try { + setFinalResults(finalizer, configuration); + jobStatus.withState(State.SUCCEEDED).withDescription("Job completed."); + } catch (JobException je) { + jobStatus.withState(State.FAILED).withDescription("Job finalize failed.") + .withFailureException(je); + } + break; + case FAILED: + jobStatus.withState(State.FAILED).withDescription(mrJob.getStatus().getFailureInfo()); + break; + case KILLED: + jobStatus.withState(State.KILLED).withDescription(mrJob.getStatus().getFailureInfo()); + break; + } + return false; + } else { + float mapProg = mrJob.mapProgress(); + float reduceProg = mrJob.reduceProgress(); + float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100; + String description = String + .format("map: %s%%, reduce: %s%%", mapProg * 100, reduceProg * 100); + jobStatus.withPercentComplete(totalProgress).withState(State.RUNNING) + .withDescription(description); + } + } catch (InterruptedException | IOException e) { + jobStatus.withPercentComplete(100.0).withState(State.FAILED).withFailureException(e); + return false; + } + return true; + } + /** - * Writes results using finalizer. Returns true on success, false otherwise. + * Writes results using finalizer. Returns true on success, false otherwise. If no results + * to finalize, returns empty Pageable. * * @param finalizer Writes results. * @param configuration Configure the finalizer. * @return Returns true on success, false otherwise. */ - private boolean setFinalResults(Finalizer<Path> finalizer, Map<String, Object> configuration) { - boolean success = true; - Pageable<Path> results = new PcapPages(); - try { - results = finalizer.finalizeJob(configuration); - } catch (JobException e) { - LOG.error("Failed to finalize job.", e); - success = false; + private void setFinalResults(Finalizer<Path> finalizer, Map<String, Object> configuration) + throws JobException { + Pageable<Path> results = finalizer.finalizeJob(configuration); + if (results == null) { + results = new PcapPages(); } synchronized (this) { finalResults = results; } - return success; } /** - * Creates, but does not submit the job. This is the core MapReduce mrJob. + * Creates, but does not submit the job. This is the core MapReduce mrJob. Empty input path + * results in a null to be returned instead of creating the job. */ public Job createJob(Optional<String> jobName ,Path basePath @@ -366,6 +406,11 @@ public class PcapJob<T> implements Statusable<Path> { , PcapFilterConfigurator<T> filterImpl ) throws IOException { + Iterable<String> filteredPaths = FileFilterUtil.getPathsInTimeRange(beginNS, endNS, listFiles(fs, basePath)); + String inputPaths = Joiner.on(',').join(filteredPaths); + if (StringUtils.isEmpty(inputPaths)) { + return null; + } conf.set(START_TS_CONF, Long.toUnsignedString(beginNS)); conf.set(END_TS_CONF, Long.toUnsignedString(endNS)); conf.set(WIDTH_CONF, "" + findWidth(beginNS, endNS, numReducers)); @@ -381,11 +426,6 @@ public class PcapJob<T> implements Statusable<Path> { job.setPartitionerClass(PcapPartitioner.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(BytesWritable.class); - Iterable<String> filteredPaths = FileFilterUtil.getPathsInTimeRange(beginNS, endNS, listFiles(fs, basePath)); - String inputPaths = Joiner.on(',').join(filteredPaths); - if (StringUtils.isEmpty(inputPaths)) { - return null; - } SequenceFileInputFormat.addInputPaths(job, inputPaths); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); @@ -411,54 +451,9 @@ public class PcapJob<T> implements Statusable<Path> { return JobType.MAP_REDUCE; } - /** - * Synchronized for mrJob and jobState - */ @Override public synchronized JobStatus getStatus() throws JobException { - JobStatus status = new JobStatus(); - if (mrJob == null) { - status.withPercentComplete(100).withState(State.SUCCEEDED); - } else { - try { - org.apache.hadoop.mapreduce.JobStatus mrJobStatus = mrJob.getStatus(); - status.withJobId(mrJobStatus.getJobID().toString()); - if (jobState == State.SUCCEEDED) { - status.withPercentComplete(100).withState(State.SUCCEEDED) - .withDescription("Job complete"); - } else { - if (mrJob.isComplete()) { - status.withPercentComplete(100); - switch (mrJobStatus.getState()) { - case SUCCEEDED: - status.withState(State.FINALIZING).withDescription(State.FINALIZING.toString()); - break; - case FAILED: - status.withState(State.FAILED).withDescription(State.FAILED.toString()); - break; - case KILLED: - status.withState(State.KILLED).withDescription(State.KILLED.toString()); - break; - default: - throw new IllegalStateException( - "Unknown job state reported as 'complete' by mapreduce framework: " - + mrJobStatus.getState()); - } - } else { - float mapProg = mrJob.mapProgress(); - float reduceProg = mrJob.reduceProgress(); - float totalProgress = ((mapProg / 2) + (reduceProg / 2)) * 100; - String description = String - .format("map: %s%%, reduce: %s%%", mapProg * 100, reduceProg * 100); - status.withPercentComplete(totalProgress).withState(State.RUNNING) - .withDescription(description); - } - } - } catch (Exception e) { - throw new JobException("Error occurred while attempting to retrieve job status.", e); - } - } - return status; + return new JobStatus(jobStatus); } /** @@ -483,6 +478,7 @@ public class PcapJob<T> implements Statusable<Path> { @Override public synchronized boolean isDone() { + State jobState = jobStatus.getState(); return (jobState == State.SUCCEEDED || jobState == State.KILLED || jobState == State.FAILED); http://git-wip-us.apache.org/repos/asf/metron/blob/9d4842f3/site-book/bin/generate-md.sh ---------------------------------------------------------------------- diff --git a/site-book/bin/generate-md.sh b/site-book/bin/generate-md.sh index 464cb69..83f41cc 100755 --- a/site-book/bin/generate-md.sh +++ b/site-book/bin/generate-md.sh @@ -63,6 +63,7 @@ RESOURCE_LIST=( metron-deployment/readme-images/enable-kerberos-configure-kerberos.png metron-deployment/readme-images/enable-kerberos-started.png metron-deployment/readme-images/enable-kerberos.png + metron-platform/metron-job/metron-job_state_statechart_diagram.svg metron-platform/metron-parsers/parser_arch.png metron-platform/metron-indexing/indexing_arch.png metron-platform/metron-enrichment/enrichment_arch.png @@ -86,6 +87,7 @@ HREF_REWRITE_LIST=( metron-deployment/Kerberos-ambari-setup.md 's#(readme-images/enable-kerberos.png)#(../images/enable-kerberos.png)#g' metron-platform/metron-enrichment/README.md 's#(enrichment_arch.png)#(../../images/enrichment_arch.png)#g' metron-platform/metron-indexing/README.md 's#(indexing_arch.png)#(../../images/indexing_arch.png)#g' + metron-platform/metron-job/README.md 's#(metron-job_state_statechart_diagram.svg)#(../../images/metron-job_state_statechart_diagram.svg)#g' metron-platform/metron-parsers/README.md 's#(parser_arch.png)#(../../images/parser_arch.png)#g' metron-analytics/metron-maas-service/README.md 's#(maas_arch.png)#(../../images/maas_arch.png)#g' )
