rkhachatryan commented on a change in pull request #14814:
URL: https://github.com/apache/flink/pull/14814#discussion_r568846031
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
##########
@@ -361,6 +374,171 @@ public void
testTriggerSavepointWithCheckpointingDisabled() throws Exception {
}
}
+ static class BoundedPassThroughOperator<T> extends
AbstractStreamOperator<T>
+ implements OneInputStreamOperator<T, T>, BoundedOneInput {
+ static volatile CountDownLatch progressLatch;
+ static volatile CountDownLatch snapshotLatch;
+ static volatile boolean inputEnded;
+
+ private transient boolean processed;
+
+ BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
+ this.chainingStrategy = chainingStrategy;
+ }
+
+ private static void allowSnapshots() {
+ snapshotLatch.countDown();
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ inputEnded = true;
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> element) throws Exception {
+ output.collect(element);
+ if (!processed) {
+ processed = true;
+ progressLatch.countDown();
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws
Exception {
+ snapshotLatch.await();
+ super.snapshotState(context);
+ }
+
+ // --------------------------------------------------------------------
+
+ static CountDownLatch getProgressLatch() {
+ return progressLatch;
+ }
+
+ static void resetForTest(int parallelism, boolean allowSnapshots) {
+ progressLatch = new CountDownLatch(parallelism);
+ snapshotLatch = new CountDownLatch(allowSnapshots ? 0 :
parallelism);
+ inputEnded = false;
+ }
+ }
+
+ @Test
+ public void testStopSavepointWithBoundedInputConcurrently() throws
Exception {
+ final int numTaskManagers = 2;
+ final int numSlotsPerTaskManager = 2;
+
+ while (true) {
+
+ final MiniClusterResourceFactory clusterFactory =
+ new MiniClusterResourceFactory(
+ numTaskManagers,
+ numSlotsPerTaskManager,
+ getFileBasedCheckpointsConfig());
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
+ env.setParallelism(1);
+
+ // It's only possible to test this with chaining. Without it, JM
fails the job before
+ // the downstream gets the abort notification
+ BoundedPassThroughOperator<Integer> operator =
+ new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+ InfiniteTestSource source = new InfiniteTestSource();
+ DataStream<Integer> stream =
+ env.addSource(source)
+ .transform("pass-through",
BasicTypeInfo.INT_TYPE_INFO, operator);
+
+ stream.addSink(new DiscardingSink<>());
+
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ MiniClusterWithClientResource cluster = clusterFactory.get();
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
+
+ try {
+ BoundedPassThroughOperator.resetForTest(1, false);
+ InfiniteTestSource.resetForTest();
+
+ client.submitJob(jobGraph).get();
+
+ BoundedPassThroughOperator.getProgressLatch().await();
+ InfiniteTestSource.suspendAll(); // prevent deadlock in
cancelAllAndAwait
+ CompletableFuture<String> stop =
client.stopWithSavepoint(jobId, false, null);
+ // await checkpoint start (not explicit signals to avoid
deadlocks)
+ Thread.sleep(500);
+ InfiniteTestSource.cancelAllAndAwait(); // emulate end of input
Review comment:
Yes, I tried to avoid using `sleep` here but ended up with either a
deadlock (because of chaining) or test failure (because
`BoundedPassThroughOperator#snapshotState` is too late).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]