akalash commented on a change in pull request #15496:
URL: https://github.com/apache/flink/pull/15496#discussion_r612644853



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed() 
throws Exception {
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         try {
             // assert that the job only execute checkpoint once and only 
failed once.
-            TestUtils.submitJobAndWaitForResult(
+            submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (JobExecutionException jobException) {
             Optional<FlinkRuntimeException> throwable =
-                    ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
-            Assert.assertTrue(throwable.isPresent());
-            Assert.assertEquals(
-                    
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
-                    throwable.get().getMessage());
+                    findThrowable(jobException, FlinkRuntimeException.class);
+            assertTrue(throwable.isPresent());
+            assertEquals(
+                    EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, 
throwable.get().getMessage());
         }
         // assert that the job only failed once.
-        Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+        assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+    }
+
+    @Test
+    public void testSourceFailureTriggerJobFailed() throws Exception {
+        // given: Environment with failed source and no restart strategy.
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(2000L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        try {
+            // when: Execute job once.
+            submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
+        } catch (JobExecutionException jobException) {
+            Optional<RuntimeException> throwable =
+                    findThrowable(jobException, RuntimeException.class);
+
+            // then: Job failed with expected exception.
+            assertTrue(throwable.isPresent());
+            assertEquals(FailedSource.SOURCE_FAILED_MSG, 
throwable.get().getMessage());

Review comment:
       I think it makes sense what you said but the main target to check that 
the cluster fails only by expected reason. But anyway I think I will rework 
this place to avoid fragility

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed() 
throws Exception {
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         try {
             // assert that the job only execute checkpoint once and only 
failed once.
-            TestUtils.submitJobAndWaitForResult(
+            submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (JobExecutionException jobException) {
             Optional<FlinkRuntimeException> throwable =
-                    ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
-            Assert.assertTrue(throwable.isPresent());
-            Assert.assertEquals(
-                    
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
-                    throwable.get().getMessage());
+                    findThrowable(jobException, FlinkRuntimeException.class);
+            assertTrue(throwable.isPresent());
+            assertEquals(
+                    EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, 
throwable.get().getMessage());
         }
         // assert that the job only failed once.
-        Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+        assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+    }
+
+    @Test
+    public void testSourceFailureTriggerJobFailed() throws Exception {
+        // given: Environment with failed source and no restart strategy.
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(2000L);

Review comment:
       I didn't think about the true target of this property. I will take a 
look and indeed change it to a latch if possible.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed() 
throws Exception {
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         try {
             // assert that the job only execute checkpoint once and only 
failed once.
-            TestUtils.submitJobAndWaitForResult(
+            submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (JobExecutionException jobException) {
             Optional<FlinkRuntimeException> throwable =
-                    ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
-            Assert.assertTrue(throwable.isPresent());
-            Assert.assertEquals(
-                    
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
-                    throwable.get().getMessage());
+                    findThrowable(jobException, FlinkRuntimeException.class);
+            assertTrue(throwable.isPresent());
+            assertEquals(
+                    EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, 
throwable.get().getMessage());
         }
         // assert that the job only failed once.
-        Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+        assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+    }
+
+    @Test
+    public void testSourceFailureTriggerJobFailed() throws Exception {
+        // given: Environment with failed source and no restart strategy.
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(2000L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        try {
+            // when: Execute job once.
+            submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
+        } catch (JobExecutionException jobException) {
+            Optional<RuntimeException> throwable =
+                    findThrowable(jobException, RuntimeException.class);
+
+            // then: Job failed with expected exception.
+            assertTrue(throwable.isPresent());
+            assertEquals(FailedSource.SOURCE_FAILED_MSG, 
throwable.get().getMessage());
+        }
+        // and: Job failed only once.
+        assertEquals(1, FailedSource.INITIALIZE_TIMES.get());
+    }
+
+    private static class FailedSource extends 
RichParallelSourceFunction<String>
+            implements CheckpointedFunction {
+
+        public static final AtomicInteger INITIALIZE_TIMES = new 
AtomicInteger(0);
+        public static final String SOURCE_FAILED_MSG = "source failed";
+
+        private volatile boolean running;
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            running = true;
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            while (running) {
+                ctx.collect("test");

Review comment:
       latch.await is not ok because the target is the simulation of the loop 
with checking 'running' flag. But it can be easily replaced by parkNanos for 
example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to