zhuzhurk commented on code in PR #21773:
URL: https://github.com/apache/flink/pull/21773#discussion_r1089880149
##########
flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java:
##########
@@ -184,6 +185,23 @@ public void testSpeculativeSlowSink() throws Exception {
assertThat(DummyCommitter.foundSpeculativeWriter).isTrue();
}
+ @Test
+ public void testNonSpeculativeSlowSinkFunction() throws Exception {
+ executeJob(this::setupNonSpeculativeSlowSinkFunction);
+ waitUntilJobArchived();
+
+ checkResults();
+ }
+
+ @Test
+ @Timeout(30)
Review Comment:
`Timeout` should be avoided for Flink tests. See
https://flink.apache.org/contributing/code-style-and-quality-common.html#avoid-timeouts-in-junit-tests.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1830,6 +1828,68 @@ void testSinkSupportConcurrentExecutionAttempts() {
}
}
+ @Test
+ void testSinkFunctionNotSupportConcurrentExecutionAttempts() {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ final DataStream<Integer> source = env.fromElements(1, 2,
3).name("source");
+ source.rebalance()
+ .addSink(new
TestingSinkFunctionNotSupportConcurrentExecutionAttempts<>())
+ .name("sink");
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+ assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ if (jobVertex.getName().contains("source")) {
+
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue();
+ } else if (jobVertex.getName().contains("sink")) {
+
assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse();
+ } else {
+ Assertions.fail("Unexpected job vertex " +
jobVertex.getName());
+ }
+ }
+ }
+
+ @Test
+ void testSinkFunctionSupportConcurrentExecutionAttempts() {
Review Comment:
Maybe reuse this test body with a method
`testWhetherSinkFunctionSupportsConcurrentExecutionAttempts(SinkFunction
function, boolean isSupported)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]