rkhachatryan commented on a change in pull request #15496:
URL: https://github.com/apache/flink/pull/15496#discussion_r612616096



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed() 
throws Exception {
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         try {
             // assert that the job only execute checkpoint once and only 
failed once.
-            TestUtils.submitJobAndWaitForResult(
+            submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (JobExecutionException jobException) {
             Optional<FlinkRuntimeException> throwable =
-                    ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
-            Assert.assertTrue(throwable.isPresent());
-            Assert.assertEquals(
-                    
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
-                    throwable.get().getMessage());
+                    findThrowable(jobException, FlinkRuntimeException.class);
+            assertTrue(throwable.isPresent());
+            assertEquals(
+                    EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, 
throwable.get().getMessage());
         }
         // assert that the job only failed once.
-        Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+        assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+    }
+
+    @Test
+    public void testSourceFailureTriggerJobFailed() throws Exception {
+        // given: Environment with failed source and no restart strategy.
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(2000L);

Review comment:
       I guess this big interval is needed to ensure that the 
LegacySourceThread has actually started.
   Could we use some more explicit mean instead? (like future / latch / 
condition)

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed() 
throws Exception {
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         try {
             // assert that the job only execute checkpoint once and only 
failed once.
-            TestUtils.submitJobAndWaitForResult(
+            submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (JobExecutionException jobException) {
             Optional<FlinkRuntimeException> throwable =
-                    ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
-            Assert.assertTrue(throwable.isPresent());
-            Assert.assertEquals(
-                    
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
-                    throwable.get().getMessage());
+                    findThrowable(jobException, FlinkRuntimeException.class);
+            assertTrue(throwable.isPresent());
+            assertEquals(
+                    EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, 
throwable.get().getMessage());
         }
         // assert that the job only failed once.
-        Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+        assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+    }
+
+    @Test
+    public void testSourceFailureTriggerJobFailed() throws Exception {
+        // given: Environment with failed source and no restart strategy.
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(2000L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        try {
+            // when: Execute job once.
+            submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
+        } catch (JobExecutionException jobException) {
+            Optional<RuntimeException> throwable =
+                    findThrowable(jobException, RuntimeException.class);
+
+            // then: Job failed with expected exception.
+            assertTrue(throwable.isPresent());
+            assertEquals(FailedSource.SOURCE_FAILED_MSG, 
throwable.get().getMessage());
+        }
+        // and: Job failed only once.
+        assertEquals(1, FailedSource.INITIALIZE_TIMES.get());
+    }
+
+    private static class FailedSource extends 
RichParallelSourceFunction<String>
+            implements CheckpointedFunction {
+
+        public static final AtomicInteger INITIALIZE_TIMES = new 
AtomicInteger(0);
+        public static final String SOURCE_FAILED_MSG = "source failed";
+
+        private volatile boolean running;
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            running = true;
+        }
+
+        @Override
+        public void run(SourceContext<String> ctx) throws Exception {
+            while (running) {
+                ctx.collect("test");

Review comment:
       To use `ctx.collect`, `synchronized (ctx.getCheckpointLock())` must be 
used.
   But probably we don't need to emit anything here? Can we just block on some 
contidion (like latch.await).

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed() 
throws Exception {
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         try {
             // assert that the job only execute checkpoint once and only 
failed once.
-            TestUtils.submitJobAndWaitForResult(
+            submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (JobExecutionException jobException) {
             Optional<FlinkRuntimeException> throwable =
-                    ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
-            Assert.assertTrue(throwable.isPresent());
-            Assert.assertEquals(
-                    
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
-                    throwable.get().getMessage());
+                    findThrowable(jobException, FlinkRuntimeException.class);
+            assertTrue(throwable.isPresent());
+            assertEquals(
+                    EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, 
throwable.get().getMessage());
         }
         // assert that the job only failed once.
-        Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+        assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+    }
+
+    @Test
+    public void testSourceFailureTriggerJobFailed() throws Exception {
+        // given: Environment with failed source and no restart strategy.
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(2000L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        try {
+            // when: Execute job once.
+            submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
+        } catch (JobExecutionException jobException) {
+            Optional<RuntimeException> throwable =
+                    findThrowable(jobException, RuntimeException.class);
+
+            // then: Job failed with expected exception.
+            assertTrue(throwable.isPresent());
+            assertEquals(FailedSource.SOURCE_FAILED_MSG, 
throwable.get().getMessage());

Review comment:
       These assertions seem a bit fragile to me (what if Flink wraps an 
exception into `RuntimeException`?).
   And they are not actually checking the production code, but the test itself: 
without the fix the test will time out; and without the exception thrown the 
job might have exited for some other reason.
   
   But the setup is quite simple IMO, so I'd remove them.
   
   WDYT?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed() 
throws Exception {
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         try {
             // assert that the job only execute checkpoint once and only 
failed once.
-            TestUtils.submitJobAndWaitForResult(
+            submitJobAndWaitForResult(
                     cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
         } catch (JobExecutionException jobException) {
             Optional<FlinkRuntimeException> throwable =
-                    ExceptionUtils.findThrowable(jobException, 
FlinkRuntimeException.class);
-            Assert.assertTrue(throwable.isPresent());
-            Assert.assertEquals(
-                    
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
-                    throwable.get().getMessage());
+                    findThrowable(jobException, FlinkRuntimeException.class);
+            assertTrue(throwable.isPresent());
+            assertEquals(
+                    EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, 
throwable.get().getMessage());
         }
         // assert that the job only failed once.
-        Assert.assertEquals(1, 
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+        assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+    }
+
+    @Test
+    public void testSourceFailureTriggerJobFailed() throws Exception {
+        // given: Environment with failed source and no restart strategy.
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(2000L);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
+        JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+        try {
+            // when: Execute job once.
+            submitJobAndWaitForResult(
+                    cluster.getClusterClient(), jobGraph, 
getClass().getClassLoader());
+        } catch (JobExecutionException jobException) {
+            Optional<RuntimeException> throwable =
+                    findThrowable(jobException, RuntimeException.class);
+
+            // then: Job failed with expected exception.
+            assertTrue(throwable.isPresent());
+            assertEquals(FailedSource.SOURCE_FAILED_MSG, 
throwable.get().getMessage());
+        }
+        // and: Job failed only once.
+        assertEquals(1, FailedSource.INITIALIZE_TIMES.get());
+    }
+
+    private static class FailedSource extends 
RichParallelSourceFunction<String>
+            implements CheckpointedFunction {
+
+        public static final AtomicInteger INITIALIZE_TIMES = new 
AtomicInteger(0);

Review comment:
       This variable ideally needs to be reset before running the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to