akalash commented on a change in pull request #15496:
URL: https://github.com/apache/flink/pull/15496#discussion_r612644853
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed()
throws Exception {
JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
try {
// assert that the job only execute checkpoint once and only
failed once.
- TestUtils.submitJobAndWaitForResult(
+ submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
} catch (JobExecutionException jobException) {
Optional<FlinkRuntimeException> throwable =
- ExceptionUtils.findThrowable(jobException,
FlinkRuntimeException.class);
- Assert.assertTrue(throwable.isPresent());
- Assert.assertEquals(
-
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
- throwable.get().getMessage());
+ findThrowable(jobException, FlinkRuntimeException.class);
+ assertTrue(throwable.isPresent());
+ assertEquals(
+ EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
throwable.get().getMessage());
}
// assert that the job only failed once.
- Assert.assertEquals(1,
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+ assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+ }
+
+ @Test
+ public void testSourceFailureTriggerJobFailed() throws Exception {
+ // given: Environment with failed source and no restart strategy.
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(2000L);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
+ JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ try {
+ // when: Execute job once.
+ submitJobAndWaitForResult(
+ cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
+ } catch (JobExecutionException jobException) {
+ Optional<RuntimeException> throwable =
+ findThrowable(jobException, RuntimeException.class);
+
+ // then: Job failed with expected exception.
+ assertTrue(throwable.isPresent());
+ assertEquals(FailedSource.SOURCE_FAILED_MSG,
throwable.get().getMessage());
Review comment:
I think it makes sense what you said but the main target to check that
the cluster fails only by expected reason. But anyway I think I will rework
this place to avoid fragility
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed()
throws Exception {
JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
try {
// assert that the job only execute checkpoint once and only
failed once.
- TestUtils.submitJobAndWaitForResult(
+ submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
} catch (JobExecutionException jobException) {
Optional<FlinkRuntimeException> throwable =
- ExceptionUtils.findThrowable(jobException,
FlinkRuntimeException.class);
- Assert.assertTrue(throwable.isPresent());
- Assert.assertEquals(
-
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
- throwable.get().getMessage());
+ findThrowable(jobException, FlinkRuntimeException.class);
+ assertTrue(throwable.isPresent());
+ assertEquals(
+ EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
throwable.get().getMessage());
}
// assert that the job only failed once.
- Assert.assertEquals(1,
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+ assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+ }
+
+ @Test
+ public void testSourceFailureTriggerJobFailed() throws Exception {
+ // given: Environment with failed source and no restart strategy.
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(2000L);
Review comment:
I didn't think about the true target of this property. I will take a
look and indeed change it to a latch if possible.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.java
##########
@@ -82,18 +85,77 @@ public void testAsyncCheckpointFailureTriggerJobFailed()
throws Exception {
JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
try {
// assert that the job only execute checkpoint once and only
failed once.
- TestUtils.submitJobAndWaitForResult(
+ submitJobAndWaitForResult(
cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
} catch (JobExecutionException jobException) {
Optional<FlinkRuntimeException> throwable =
- ExceptionUtils.findThrowable(jobException,
FlinkRuntimeException.class);
- Assert.assertTrue(throwable.isPresent());
- Assert.assertEquals(
-
CheckpointFailureManager.EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
- throwable.get().getMessage());
+ findThrowable(jobException, FlinkRuntimeException.class);
+ assertTrue(throwable.isPresent());
+ assertEquals(
+ EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE,
throwable.get().getMessage());
}
// assert that the job only failed once.
- Assert.assertEquals(1,
StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+ assertEquals(1, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
+ }
+
+ @Test
+ public void testSourceFailureTriggerJobFailed() throws Exception {
+ // given: Environment with failed source and no restart strategy.
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(2000L);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.addSource(new FailedSource()).addSink(new DiscardingSink<>());
+ JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+ try {
+ // when: Execute job once.
+ submitJobAndWaitForResult(
+ cluster.getClusterClient(), jobGraph,
getClass().getClassLoader());
+ } catch (JobExecutionException jobException) {
+ Optional<RuntimeException> throwable =
+ findThrowable(jobException, RuntimeException.class);
+
+ // then: Job failed with expected exception.
+ assertTrue(throwable.isPresent());
+ assertEquals(FailedSource.SOURCE_FAILED_MSG,
throwable.get().getMessage());
+ }
+ // and: Job failed only once.
+ assertEquals(1, FailedSource.INITIALIZE_TIMES.get());
+ }
+
+ private static class FailedSource extends
RichParallelSourceFunction<String>
+ implements CheckpointedFunction {
+
+ public static final AtomicInteger INITIALIZE_TIMES = new
AtomicInteger(0);
+ public static final String SOURCE_FAILED_MSG = "source failed";
+
+ private volatile boolean running;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ running = true;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ while (running) {
+ ctx.collect("test");
Review comment:
latch.await is not ok because the target is the simulation of the loop
with checking 'running' flag. But it can be easily replaced by parkNanos for
example.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]