zhuzhurk commented on code in PR #21773:
URL: https://github.com/apache/flink/pull/21773#discussion_r1089880149


##########
flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java:
##########
@@ -184,6 +185,23 @@ public void testSpeculativeSlowSink() throws Exception {
         assertThat(DummyCommitter.foundSpeculativeWriter).isTrue();
     }
 
+    @Test
+    public void testNonSpeculativeSlowSinkFunction() throws Exception {
+        executeJob(this::setupNonSpeculativeSlowSinkFunction);
+        waitUntilJobArchived();
+
+        checkResults();
+    }
+
+    @Test
+    @Timeout(30)

Review Comment:
   `Timeout` should be avoided for Flink tests. See 
https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-timeouts-in-junit-tests.



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1830,6 +1828,68 @@ void testSinkSupportConcurrentExecutionAttempts() {
         }
     }
 
+    @Test
+    void testSinkFunctionNotSupportConcurrentExecutionAttempts() {
+        final StreamExecutionEnvironment env =
+                StreamExecutionEnvironment.getExecutionEnvironment(new 
Configuration());
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        final DataStream<Integer> source = env.fromElements(1, 2, 
3).name("source");
+        source.rebalance()
+                .addSink(new 
TestingSinkFunctionNotSupportConcurrentExecutionAttempts<>())
+                .name("sink");
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
+        for (JobVertex jobVertex : jobGraph.getVertices()) {
+            if (jobVertex.getName().contains("source")) {
+                
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+            } else if (jobVertex.getName().contains("sink")) {
+                
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+            } else {
+                Assertions.fail("Unexpected job vertex " + 
jobVertex.getName());
+            }
+        }
+    }
+
+    @Test
+    void testSinkFunctionSupportConcurrentExecutionAttempts() {

Review Comment:
   Maybe reuse this test body with a method 
`testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(SinkFunction 
function, boolean isSupported)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to