[FLINK-4932] [distributed coordination] Failing in state RESTARTING only fails 
terminally if no more restarts are possible

If in state RESTARTING a failure occurs, then a new restart attempt is started. 
Only if the
restart strategy no longer allows further restarts or if the thrown exception 
is of type
SuppressRestartsException a job can go from RESTARTING into FAILED.

This closes #2710


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/18507de3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/18507de3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/18507de3

Branch: refs/heads/master
Commit: 18507de3c068795c93c5c689388a22857f2f817c
Parents: 59a5551
Author: Till Rohrmann <[email protected]>
Authored: Thu Oct 27 18:32:08 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Mon Oct 31 19:16:41 2016 +0100

----------------------------------------------------------------------
 docs/fig/job_status.svg                         | 263 ++++++++++++-------
 .../runtime/executiongraph/ExecutionGraph.java  |  60 ++++-
 .../ExecutionGraphMetricsTest.java              |   4 +-
 .../ExecutionGraphRestartTest.java              |   7 +-
 .../ExecutionGraphSignalsTest.java              | 106 ++++++--
 .../restart/InfiniteDelayRestartStrategy.java   |  22 +-
 6 files changed, 338 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/docs/fig/job_status.svg
----------------------------------------------------------------------
diff --git a/docs/fig/job_status.svg b/docs/fig/job_status.svg
index c259db4..488f883 100644
--- a/docs/fig/job_status.svg
+++ b/docs/fig/job_status.svg
@@ -38,6 +38,50 @@ under the License.
   <defs
      id="defs4">
     <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker4737"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend">
+      <path
+         inkscape:connector-curvature="0"
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c 
-1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         
style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path4739" />
+    </marker>
+    <marker
+       inkscape:stockid="Arrow1Lend"
+       orient="auto"
+       refY="0.0"
+       refX="0.0"
+       id="marker4552"
+       style="overflow:visible;"
+       inkscape:isstock="true">
+      <path
+         id="path4298"
+         d="M 0.0,0.0 L 5.0,-5.0 L -12.5,0.0 L 5.0,5.0 L 0.0,0.0 z "
+         
style="fill-rule:evenodd;stroke:#000000;stroke-width:1pt;stroke-opacity:1;fill:#000000;fill-opacity:1"
+         transform="scale(0.8) rotate(180) translate(12.5,0)" />
+    </marker>
+    <marker
+       inkscape:isstock="true"
+       style="overflow:visible"
+       id="marker4551"
+       refX="0"
+       refY="0"
+       orient="auto"
+       inkscape:stockid="Arrow2Mend">
+      <path
+         inkscape:connector-curvature="0"
+         transform="scale(-0.6,-0.6)"
+         d="M 8.7185878,4.0337352 -2.2072895,0.01601326 8.7185884,-4.0017078 c 
-1.7454984,2.3720609 -1.7354408,5.6174519 -6e-7,8.035443 z"
+         
style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
+         id="path4553" />
+    </marker>
+    <marker
        inkscape:stockid="Arrow2Mstart"
        orient="auto"
        refY="0.0"
@@ -343,7 +387,8 @@ under the License.
        refX="0"
        id="Arrow2Mend"
        style="overflow:visible"
-       inkscape:isstock="true">
+       inkscape:isstock="true"
+       inkscape:collect="always">
       <path
          id="path4486"
          
style="fill:#000000;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.625;stroke-linejoin:round;stroke-opacity:1"
@@ -404,17 +449,17 @@ under the License.
      borderopacity="1.0"
      inkscape:pageopacity="0.0"
      inkscape:pageshadow="2"
-     inkscape:zoom="1.4"
-     inkscape:cx="366.44711"
-     inkscape:cy="435.59833"
+     inkscape:zoom="0.98994949"
+     inkscape:cx="333.41527"
+     inkscape:cy="460.79478"
      inkscape:document-units="px"
      inkscape:current-layer="layer1"
      showgrid="true"
-     inkscape:window-width="1402"
-     inkscape:window-height="855"
-     inkscape:window-x="38"
-     inkscape:window-y="1"
-     inkscape:window-maximized="1">
+     inkscape:window-width="1916"
+     inkscape:window-height="1300"
+     inkscape:window-x="1855"
+     inkscape:window-y="21"
+     inkscape:window-maximized="0">
     <inkscape:grid
        type="xygrid"
        id="grid4136" />
@@ -438,13 +483,13 @@ under the License.
      transform="translate(0,-272.83465)">
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-start:url(#Arrow2Mstart);marker-end:url(#marker4407)"
-       d="M 369.28571,490.93361 C 340,572.36218 330,712.36218 
340.71429,802.36218"
+       d="M 409.28571,490.93361 C 380,572.36218 370,712.36218 
380.71429,802.36218"
        id="path3473"
        inkscape:connector-curvature="0"
        sodipodi:nodetypes="cc" />
     <g
        id="g4324"
-       transform="translate(-30.285714,162.34191)">
+       transform="translate(9.714286,162.34191)">
       <rect
          ry="22.587013"
          rx="21.337021"
@@ -468,7 +513,7 @@ under the License.
     </g>
     <g
        id="g4286"
-       transform="translate(-39.560883,231.66354)">
+       transform="translate(0.439117,231.66354)">
       <rect
          
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4254"
@@ -492,7 +537,7 @@ under the License.
     </g>
     <g
        id="g4426"
-       transform="translate(38,166)">
+       transform="translate(78,166)">
       <rect
          
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24,
 4;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4260"
@@ -516,7 +561,7 @@ under the License.
     </g>
     <g
        id="g4276"
-       transform="translate(-8.802002,175.91335)">
+       transform="translate(31.198,175.91335)">
       <rect
          
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4256"
@@ -540,7 +585,7 @@ under the License.
     </g>
     <g
        id="g4421"
-       transform="translate(40,166)">
+       transform="translate(80,166)">
       <rect
          ry="22.587013"
          rx="21.337021"
@@ -564,7 +609,7 @@ under the License.
     </g>
     <g
        id="g4416"
-       transform="translate(14,166)">
+       transform="translate(54,166)">
       <rect
          ry="22.500114"
          rx="26.670492"
@@ -588,7 +633,7 @@ under the License.
     </g>
     <g
        id="g4431"
-       transform="translate(38,166)">
+       transform="translate(78,166)">
       <rect
          ry="22.551325"
          rx="23.453072"
@@ -612,7 +657,7 @@ under the License.
     </g>
     <g
        id="g4411"
-       transform="translate(14,166)">
+       transform="translate(54,166)">
       <rect
          
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:1.5;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:none;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4250"
@@ -636,7 +681,7 @@ under the License.
     </g>
     <g
        id="g4436"
-       transform="translate(11.142857,169.57143)">
+       transform="translate(51.14286,169.57143)">
       <rect
          
style="opacity:1;fill:#5599ff;fill-opacity:1;stroke:#000000;stroke-width:4;stroke-linejoin:round;stroke-miterlimit:4;stroke-dasharray:24,
 4;stroke-dashoffset:0;stroke-opacity:1"
          id="rect4264"
@@ -660,7 +705,7 @@ under the License.
     </g>
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#Arrow2Mend)"
-       d="m 175.31595,646.72195 122.27415,0.17603"
+       d="m 215.33702,647.73984 122.07008,0"
        id="path4441"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -669,7 +714,7 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4831)"
-       d="m 423.67902,643.73984 146.73144,0"
+       d="m 464.00971,647.73984 146.07006,0"
        id="path4443"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -677,7 +722,7 @@ under the License.
        inkscape:connection-end="#g4426" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5157)"
-       d="M 143.45346,625.15282 C 240.11678,592.2528 289.05237,539.16028 
337.31308,485.78223"
+       d="M 184.13444,625.15282 382.19596,485.78223"
        id="path4445"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -686,7 +731,7 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5075)"
-       d="m 437.2653,459.19522 132.84994,10e-6"
+       d="m 477.59599,463.19522 132.18856,10e-6"
        id="path4447"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -694,7 +739,7 @@ under the License.
        inkscape:connection-end="#g4421" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4947)"
-       d="M 140.03639,670.32685 308.21925,805.99316"
+       d="M 179.41675,670.32685 343.87778,805.99316"
        id="path4449"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -702,7 +747,7 @@ under the License.
        inkscape:connection-end="#g4416" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5005)"
-       d="m 409.8612,824.4933 148.36356,3e-5"
+       d="m 450.27779,828.4933 147.58232,3e-5"
        id="path4451"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -710,7 +755,7 @@ under the License.
        inkscape:connection-end="#g4431" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5237)"
-       d="M 119.50359,668.89828 C 120,902.36221 317.44733,915.62541 
504.35792,974.20919"
+       d="M 121.48606,668.30654 C 138.78064,811.61536 345.26224,888.30903 
548.95582,965.9844"
        id="path4453"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -720,107 +765,107 @@ under the License.
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="182.57143"
+       x="222.57141"
        y="643.07654"
        id="text4913"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan4915"
-         x="182.57143"
+         x="222.57141"
          y="643.07654"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Schedule 
job</tspan></text>
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="432"
+       x="472"
        y="638.36218"
        id="text4929"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan4931"
-         x="432"
+         x="472"
          y="638.36218"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job 
vertices </tspan><tspan
          sodipodi:role="line"
-         x="432"
+         x="472"
          y="657.11218"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          id="tspan4933">in final state</tspan></text>
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="424"
+       x="464"
        y="820.36218"
        id="text5063"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan5065"
-         x="424"
+         x="464"
          y="820.36218"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job 
vertices </tspan><tspan
          sodipodi:role="line"
-         x="424"
+         x="464"
          y="839.11218"
          id="tspan5067"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">in final 
state</tspan></text>
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="442"
+       x="482"
        y="456.36221"
        id="text5139"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan5141"
-         x="442"
+         x="482"
          y="456.36221"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job 
vertices </tspan><tspan
          sodipodi:role="line"
-         x="442"
+         x="482"
          y="475.11221"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          id="tspan5143">in final state &amp; </tspan><tspan
          sodipodi:role="line"
-         x="442"
+         x="482"
          y="493.86221"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          id="tspan5145">not restartable</tspan></text>
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="219"
+       x="259"
        y="606.93359"
        id="text5227"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan5229"
-         x="219"
+         x="259"
          y="606.93359"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Fail 
job</tspan></text>
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="156.57143"
+       x="196.57141"
        y="764.07648"
        id="text5565"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan5567"
-         x="156.57143"
+         x="196.57141"
          y="764.07648"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Cancel 
job</tspan></text>
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker5892)"
-       d="m 314.18121,477.05236 c -47.69818,20.03987 -84.94599,9.32911 
-116.30849,2.14285"
+       d="m 350.99338,463.19522 -106.2755,-1e-5"
        id="path5569"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
-       inkscape:connection-start="#g4276"
+       sodipodi:nodetypes="cc"
        inkscape:connection-end="#g4411"
-       sodipodi:nodetypes="cc" />
+       inkscape:connection-start="#g4276" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6174)"
-       d="M 419.49016,485.78223 C 974.28571,652.36221 835.65722,822.42397 
665.47877,968.49491"
+       d="M 453.86766,483.76192 C 1064.7153,696.2824 753.48364,871.76678 
689.84395,959.92348"
        id="path5571"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -829,7 +874,7 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6280)"
-       d="m 413.71359,666.04114 c 466.9675,42.03536 351.85357,186.4168 
228.2627,292.45377"
+       d="M 446.3093,669.3167 C 966.50293,715.92449 749.288,844.81821 
673.74388,957.90317"
        id="path5573"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -838,7 +883,7 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6392)"
-       d="M 375.26539,850.99339 556.58971,959.92348"
+       d="M 408.60702,850.99339 589.93134,959.92348"
        id="path5575"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -846,7 +891,7 @@ under the License.
        inkscape:connection-end="#g4436" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6074)"
-       d="M 173.42792,441.40962 C 790,62.362207 855,633.79078 
686.95342,807.37059"
+       d="M 189.43309,439.22831 C 890,72.362179 790,652.36218 
702.11815,804.93187"
        id="path5579"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -855,15 +900,16 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6764)"
-       d="M 123.688,485.69533 113.6599,625.15282"
+       d="m 148.78827,484.68518 -0.14393,138.44734"
        id="path5581"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
        inkscape:connection-start="#g4411"
-       inkscape:connection-end="#g4324" />
+       inkscape:connection-end="#g4324"
+       sodipodi:nodetypes="cc" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6510)"
-       d="m 400.64822,624.43854 c 14.16495,-34.7353 13.47368,-78.92413 
6.68911,-136.51345"
+       d="m 437.72662,622.12237 c 22.19839,-49.88375 12.35398,-92.37301 
0.15901,-134.31983"
        id="path5585"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -872,7 +918,7 @@ under the License.
        inkscape:connection-start="#g4286" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker6634)"
-       d="m 394.21832,671.04114 c 18.43152,58.30256 7.7951,100.67644 
-12.18276,132.80916"
+       d="m 423.27918,670.32685 c 16.08015,35.64642 30.49507,77.65921 
-0.96955,134.65616"
        id="path5587"
        inkscape:connector-type="polyline"
        inkscape:connector-curvature="0"
@@ -881,125 +927,125 @@ under the License.
        sodipodi:nodetypes="cc" />
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker7718)"
-       d="M 194.71429,444.79077 C 295.26058,393.86555 426.46327,380.03465 
584,438.3622"
+       d="M 234.71429,444.79077 C 335.26058,393.86555 466.46327,380.03465 
624,438.3622"
        id="path7710"
        inkscape:connector-curvature="0"
        sodipodi:nodetypes="cc" />
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="10"
-       y="556.36218"
+       x="74.24366"
+       y="557.37231"
        id="text8166"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan8168"
-         x="10"
-         y="556.36218"
+         x="74.24366"
+         y="557.37231"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Restarted 
job</tspan></text>
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="156"
-       y="906.36218"
+       x="130.87746"
+       y="919.43469"
        id="text8170"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan8172"
-         x="156"
-         y="906.36218"
+         x="130.87746"
+         y="919.43469"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Suspend 
job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8174"
        y="906.93359"
-       x="468.57144"
+       x="508.57141"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="906.93359"
-         x="468.57144"
+         x="508.57141"
          id="tspan8176"
          sodipodi:role="line">Suspend job</tspan></text>
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="746.42859"
-       y="906.2193"
+       x="749.28571"
+       y="931.93359"
        id="text8178"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan8180"
-         x="746.42859"
-         y="906.2193"
+         x="749.28571"
+         y="931.93359"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">Suspend 
job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8182"
        y="717.64789"
-       x="482.14288"
+       x="522.14288"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="717.64789"
-         x="482.14288"
+         x="522.14288"
          id="tspan8184"
          sodipodi:role="line">Suspend job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8186"
        y="752.64789"
-       x="409.42856"
+       x="449.42859"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="752.64789"
-         x="409.42856"
+         x="449.42859"
          id="tspan8188"
          sodipodi:role="line">Cancel job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8190"
-       y="390.50507"
-       x="361.14285"
+       y="390.08667"
+       x="344.87888"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
-         y="390.50507"
-         x="361.14285"
+         y="390.08667"
+         x="344.87888"
          id="tspan8192"
-         sodipodi:role="line">Fail job</tspan></text>
+         sodipodi:role="line">Fail and job not restartable</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text8194"
        y="306.21933"
-       x="487.28571"
+       x="527.28571"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="306.21933"
-         x="487.28571"
+         x="527.28571"
          id="tspan8196"
          sodipodi:role="line">Cancel job</tspan></text>
     <text
        xml:space="preserve"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="174"
+       x="214"
        y="510.36221"
        id="text8198"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan8200"
-         x="174"
+         x="214"
          y="510.36221"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">All job 
vertices</tspan><tspan
          sodipodi:role="line"
-         x="174"
+         x="214"
          y="529.11218"
          id="tspan8202"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">in final state 
&amp;</tspan><tspan
          sodipodi:role="line"
-         x="174"
+         x="214"
          y="547.86218"
          id="tspan8204"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;writing-mode:lr-tb;text-anchor:start">restartable</tspan></text>
@@ -1007,43 +1053,80 @@ under the License.
        sodipodi:linespacing="125%"
        id="text8206"
        y="566.93372"
-       x="418.28571"
+       x="458.28571"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="566.93372"
-         x="418.28571"
+         x="458.28571"
          id="tspan8208"
          sodipodi:role="line">Fail job</tspan></text>
     <path
        
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4534)"
-       d="M 9.5714286,648.36221 46,648.36221"
+       d="m 49.57143,648.36221 36.42857,0"
        id="path3470"
        inkscape:connector-curvature="0"
        sodipodi:nodetypes="cc" />
     <text
        xml:space="preserve"
        
style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
-       x="250.71428"
+       x="290.71429"
        y="710.93359"
        id="text7267"
        sodipodi:linespacing="125%"><tspan
          sodipodi:role="line"
          id="tspan7269"
-         x="250.71428"
+         x="290.71429"
          y="710.93359"
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:Sans-serif;-inkscape-font-specification:Sans-serif">Cancel
 job</tspan></text>
     <text
        sodipodi:linespacing="125%"
        id="text7271"
        y="565.505"
-       x="293.28571"
+       x="333.28571"
        
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:25px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 
Normal';text-align:start;letter-spacing:0px;word-spacing:0px;writing-mode:lr-tb;text-anchor:start;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
        xml:space="preserve"><tspan
          
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;line-height:125%;font-family:sans-serif;-inkscape-font-specification:'sans-serif,
 Normal';text-align:start;writing-mode:lr-tb;text-anchor:start"
          y="565.505"
-         x="293.28571"
+         x="333.28571"
          id="tspan7273"
          sodipodi:role="line">Fail job</tspan></text>
+    <path
+       inkscape:connector-curvature="0"
+       inkscape:connector-type="polyline"
+       id="path4525"
+       d="M 89.65484,475.69533 C -64.057054,715.31058 17.022511,901.60666 
537.98082,988.00996"
+       
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-dasharray:none;stroke-opacity:1;marker-end:url(#marker4551)"
+       sodipodi:nodetypes="cc" />
+    <text
+       xml:space="preserve"
+       
style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="80"
+       y="792.36218"
+       id="text4597"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan4599"
+         x="80"
+         y="792.36218"
+         
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:sans-serif;-inkscape-font-specification:sans-serif">Suspend
 job</tspan></text>
+    <path
+       
style="fill:none;fill-rule:evenodd;stroke:#000000;stroke-width:2.5;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;stroke-miterlimit:4;stroke-dasharray:none;marker-end:url(#marker4737)"
+       d="M 96.56344,444.48401 C -20,312.36218 270,332.36218 
156.87312,441.05766"
+       id="path4289"
+       inkscape:connector-curvature="0"
+       sodipodi:nodetypes="cc" />
+    <text
+       xml:space="preserve"
+       
style="font-style:normal;font-weight:normal;font-size:25px;line-height:125%;font-family:Sans;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1"
+       x="41.817253"
+       y="342.46371"
+       id="text4905"
+       sodipodi:linespacing="125%"><tspan
+         sodipodi:role="line"
+         id="tspan4907"
+         x="41.817253"
+         y="342.46371"
+         
style="font-style:normal;font-variant:normal;font-weight:normal;font-stretch:normal;font-size:15px;font-family:sans-serif;-inkscape-font-specification:sans-serif">Fail
 and job restartable</tspan></text>
   </g>
 </svg>

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 074a04d..36dba63 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -818,14 +818,13 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                current == JobStatus.SUSPENDED ||
                                current.isGloballyTerminalState()) {
                                return;
-                       } else if (current == JobStatus.RESTARTING && 
transitionState(current, JobStatus.FAILED, t)) {
-                               synchronized (progressLock) {
-                                       postRunCleanup();
-                                       progressLock.notifyAll();
+                       } else if (current == JobStatus.RESTARTING) {
+                               this.failureCause = t;
 
-                                       LOG.info("Job {} failed during 
restart.", getJobID());
+                               if (tryRestartOrFail()) {
                                        return;
                                }
+                               // concurrent job status change, let's check 
again
                        } else if (transitionState(current, JobStatus.FAILING, 
t)) {
                                this.failureCause = t;
 
@@ -902,6 +901,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        scheduleForExecution(slotProvider);
                }
                catch (Throwable t) {
+                       LOG.warn("Failed to restart the job.", t);
                        fail(t);
                }
        }
@@ -1007,15 +1007,10 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                                }
                                        }
                                        else if (current == JobStatus.FAILING) {
-                                               boolean allowRestart = 
!(failureCause instanceof SuppressRestartsException);
-
-                                               if (allowRestart && 
restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) 
{
-                                                       
restartStrategy.restart(this);
-                                                       break;
-                                               } else if ((!allowRestart || 
!restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, 
failureCause)) {
-                                                       postRunCleanup();
+                                               if (tryRestartOrFail()) {
                                                        break;
                                                }
+                                               // concurrent job status 
change, let's check again
                                        }
                                        else if (current == 
JobStatus.SUSPENDED) {
                                                // we've already cleaned up 
when entering the SUSPENDED state
@@ -1039,6 +1034,47 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
+       /**
+        * Try to restart the job. If we cannot restart the job (e.g. no more 
restarts allowed), then
+        * try to fail the job. This operation is only permitted if the current 
state is FAILING or
+        * RESTARTING.
+        *
+        * @return true if the operation could be executed; false if a 
concurrent job status change occurred
+        */
+       private boolean tryRestartOrFail() {
+               JobStatus currentState = state;
+
+               if (currentState == JobStatus.FAILING || currentState == 
JobStatus.RESTARTING) {
+                       synchronized (progressLock) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Try to restart the job or 
fail it if no longer possible.", failureCause);
+                               } else {
+                                       LOG.info("Try to restart the job or 
fail it if no longer possible.");
+                               }
+
+                               boolean isRestartable = !(failureCause 
instanceof SuppressRestartsException) && restartStrategy.canRestart();
+
+                               if (isRestartable && 
transitionState(currentState, JobStatus.RESTARTING)) {
+                                       LOG.info("Restarting the job...");
+                                       restartStrategy.restart(this);
+
+                                       return true;
+                               } else if (!isRestartable && 
transitionState(currentState, JobStatus.FAILED, failureCause)) {
+                                       LOG.info("Could not restart the job.", 
failureCause);
+                                       postRunCleanup();
+
+                                       return true;
+                               } else {
+                                       // we must have changed the state 
concurrently, thus we cannot complete this operation
+                                       return false;
+                               }
+                       }
+               } else {
+                       // this operation is only allowed in the state FAILING 
or RESTARTING
+                       return false;
+               }
+       }
+
        private void postRunCleanup() {
                try {
                        CheckpointCoordinator coord = 
this.checkpointCoordinator;

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 70c2bf9..5b59471 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
@@ -272,7 +273,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                        assertTrue(previousRestartingTime > 0);
        
                        // now lets fail the job while it is in restarting and 
see whether the restarting time then stops to increase
-                       executionGraph.fail(new Exception());
+                       // for this to work, we have to use a 
SuppressRestartException
+                       executionGraph.fail(new SuppressRestartsException(new 
Exception()));
        
                        assertEquals(JobStatus.FAILED, 
executionGraph.getState());
        

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 3743adb..7b2e20d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -264,9 +264,14 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
                assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
-               // Canceling needs to abort the restart
+               // The restarting should not fail with an ordinary exception
                executionGraph.fail(new Exception("Test exception"));
 
+               assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+
+               // but it should fail when sending a SuppressRestartsException
+               executionGraph.fail(new SuppressRestartsException(new 
Exception("Test exception")));
+
                assertEquals(JobStatus.FAILED, executionGraph.getState());
 
                // The restart has been aborted

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
index 59f2a9b..72784fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSignalsTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -34,7 +36,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
 
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -43,6 +44,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.api.mockito.PowerMockito;
 
 import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.same;
@@ -147,7 +150,7 @@ public class ExecutionGraphSignalsTest {
 
        @Test
        public void testCancel() throws Exception {
-               Assert.assertEquals(JobStatus.CREATED, eg.getState());
+               assertEquals(JobStatus.CREATED, eg.getState());
                eg.cancel();
 
                verifyCancel(1);
@@ -156,42 +159,42 @@ public class ExecutionGraphSignalsTest {
                eg.cancel();
 
                verifyCancel(2);
-               Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+               assertEquals(JobStatus.CANCELLING, eg.getState());
 
                eg.cancel();
 
                verifyCancel(2);
-               Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+               assertEquals(JobStatus.CANCELLING, eg.getState());
 
                f.set(eg, JobStatus.CANCELED);
                eg.cancel();
 
                verifyCancel(2);
-               Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+               assertEquals(JobStatus.CANCELED, eg.getState());
 
                f.set(eg, JobStatus.FAILED);
                eg.cancel();
 
                verifyCancel(2);
-               Assert.assertEquals(JobStatus.FAILED, eg.getState());
+               assertEquals(JobStatus.FAILED, eg.getState());
 
                f.set(eg, JobStatus.FAILING);
                eg.cancel();
 
                verifyCancel(2);
-               Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
+               assertEquals(JobStatus.CANCELLING, eg.getState());
 
                f.set(eg, JobStatus.FINISHED);
                eg.cancel();
 
                verifyCancel(2);
-               Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+               assertEquals(JobStatus.FINISHED, eg.getState());
 
                f.set(eg, JobStatus.RESTARTING);
                eg.cancel();
 
                verifyCancel(2);
-               Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+               assertEquals(JobStatus.CANCELED, eg.getState());
        }
 
        private void verifyCancel(int times) {
@@ -206,65 +209,65 @@ public class ExecutionGraphSignalsTest {
         */
        @Test
        public void testSuspend() throws Exception {
-               Assert.assertEquals(JobStatus.CREATED, eg.getState());
+               assertEquals(JobStatus.CREATED, eg.getState());
                Exception testException = new Exception("Test exception");
 
                eg.suspend(testException);
 
                verifyCancel(1);
-               Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
 
                f.set(eg, JobStatus.RUNNING);
 
                eg.suspend(testException);
 
                verifyCancel(2);
-               Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
 
                f.set(eg, JobStatus.FAILING);
 
                eg.suspend(testException);
 
                verifyCancel(3);
-               Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
 
                f.set(eg, JobStatus.CANCELLING);
 
                eg.suspend(testException);
 
                verifyCancel(4);
-               Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
 
                f.set(eg, JobStatus.FAILED);
 
                eg.suspend(testException);
 
                verifyCancel(4);
-               Assert.assertEquals(JobStatus.FAILED, eg.getState());
+               assertEquals(JobStatus.FAILED, eg.getState());
 
                f.set(eg, JobStatus.FINISHED);
 
                eg.suspend(testException);
 
                verifyCancel(4);
-               Assert.assertEquals(JobStatus.FINISHED, eg.getState());
+               assertEquals(JobStatus.FINISHED, eg.getState());
 
                f.set(eg, JobStatus.CANCELED);
 
                eg.suspend(testException);
 
                verifyCancel(4);
-               Assert.assertEquals(JobStatus.CANCELED, eg.getState());
+               assertEquals(JobStatus.CANCELED, eg.getState());
 
                f.set(eg, JobStatus.SUSPENDED);
 
                eg.fail(testException);
 
-               Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
 
                eg.cancel();
 
-               Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
        }
 
        // test that all source tasks receive STOP signal
@@ -290,6 +293,71 @@ public class ExecutionGraphSignalsTest {
                }
        }
 
+       /**
+        * Test that failing in state restarting will retrigger the restarting 
logic. This means that
+        * it only goes into the state FAILED after the restart strategy says 
the job is no longer
+        * restartable.
+        */
+       @Test
+       public void testFailureWhileRestarting() throws IllegalAccessException, 
NoSuchFieldException, InterruptedException {
+               Field restartStrategyField = 
eg.getClass().getDeclaredField("restartStrategy");
+               restartStrategyField.setAccessible(true);
+
+               restartStrategyField.set(eg, new 
InfiniteDelayRestartStrategy(1));
+
+               f.set(eg, JobStatus.RESTARTING);
+
+               eg.fail(new Exception("Test"));
+
+               // we should restart since we have one restart attempt left
+               assertEquals(JobStatus.RESTARTING, eg.getState());
+
+               eg.fail(new Exception("Test"));
+
+               // after depleting all our restart attempts we should go into 
Failed
+               assertEquals(JobStatus.FAILED, eg.getState());
+       }
+
+       /**
+        * Tests that a {@link SuppressRestartsException} in state RESTARTING 
stops the restarting
+        * immediately and sets the execution graph's state to FAILED.
+        */
+       @Test
+       public void testSuppressRestartFailureWhileRestarting() throws 
IllegalAccessException, NoSuchFieldException {
+               Field restartStrategyField = 
eg.getClass().getDeclaredField("restartStrategy");
+               restartStrategyField.setAccessible(true);
+
+               restartStrategyField.set(eg, new 
InfiniteDelayRestartStrategy());
+
+               f.set(eg, JobStatus.RESTARTING);
+
+               // suppress a possible restart
+               eg.fail(new SuppressRestartsException(new Exception("Test")));
+
+               assertEquals(JobStatus.FAILED, eg.getState());
+       }
+
+       /**
+        * Tests that we can suspend a job when in state RESTARTING.
+        */
+       @Test
+       public void testSuspendWhileRestarting() throws IllegalAccessException, 
NoSuchFieldException {
+               Field restartStrategyField = 
eg.getClass().getDeclaredField("restartStrategy");
+               restartStrategyField.setAccessible(true);
+
+               restartStrategyField.set(eg, new 
InfiniteDelayRestartStrategy());
+
+               f.set(eg, JobStatus.RESTARTING);
+
+               final Exception exception = new Exception("Suspended");
+
+               eg.suspend(exception);
+
+               assertEquals(JobStatus.SUSPENDED, eg.getState());
+
+               assertEquals(exception, eg.getFailureCause());
+       }
+
        // STOP only supported if all sources are stoppable 
        @Test(expected = StoppingException.class)
        public void testStopBatching() throws StoppingException {

http://git-wip-us.apache.org/repos/asf/flink/blob/18507de3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
index 4be0b96..c1cbdd3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
@@ -29,13 +29,33 @@ import org.slf4j.LoggerFactory;
 public class InfiniteDelayRestartStrategy implements RestartStrategy {
        private static final Logger LOG = 
LoggerFactory.getLogger(InfiniteDelayRestartStrategy.class);
 
+       private final int maxRestartAttempts;
+       private int restartAttemptCounter;
+
+       public InfiniteDelayRestartStrategy() {
+               this(-1);
+       }
+
+       public InfiniteDelayRestartStrategy(int maxRestartAttempts) {
+               this.maxRestartAttempts = maxRestartAttempts;
+               restartAttemptCounter = 0;
+       }
+
        @Override
        public boolean canRestart() {
-               return true;
+               if (maxRestartAttempts >= 0) {
+                       return restartAttemptCounter < maxRestartAttempts;
+               } else {
+                       return true;
+               }
        }
 
        @Override
        public void restart(ExecutionGraph executionGraph) {
                LOG.info("Delaying retry of job execution forever");
+
+               if (maxRestartAttempts >= 0) {
+                       restartAttemptCounter++;
+               }
        }
 }

Reply via email to