aljoscha commented on a change in pull request #13656:
URL: https://github.com/apache/flink/pull/13656#discussion_r506211458
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java
##########
@@ -19,37 +19,129 @@
package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.streaming.api.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.util.TestLogger;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
/**
* Tests for the detection of the {@link RuntimeExecutionMode runtime
execution mode} during
* stream graph translation.
*/
public class StreamGraphGeneratorExecutionModeDetectionTest extends TestLogger
{
+ @Test
+ public void
testExecutionModePropagationFromEnvWithDefaultAndBoundedSource() {
+ final StreamExecutionEnvironment environment =
+
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ environment
+ .fromSource(
+ new
MockSource(Boundedness.BOUNDED, 100),
+
WatermarkStrategy.noWatermarks(),
+ "bounded-source")
+ .print();
+
+ assertThat(
+ environment.getStreamGraph(),
+ hasProperties(
+
GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
+ ScheduleMode.EAGER,
+ true));
+ }
+
+ @Test
+ public void
testExecutionModePropagationFromEnvWithDefaultAndUnboundedSource() {
+ final StreamExecutionEnvironment environment =
+
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ environment
+ .fromSource(
+ new
MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100),
+
WatermarkStrategy.noWatermarks(),
+ "unbounded-source")
+ .print();
+
+ assertThat(
+ environment.getStreamGraph(),
+ hasProperties(
+
GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
+ ScheduleMode.EAGER,
+ true));
+ }
+
+ @Test
+ public void
testExecutionModePropagationFromEnvWithAutomaticAndBoundedSource() {
+ final Configuration config = new Configuration();
+ config.set(ExecutionOptions.RUNTIME_MODE,
RuntimeExecutionMode.AUTOMATIC);
+
+ final StreamExecutionEnvironment environment =
+
StreamExecutionEnvironment.getExecutionEnvironment();
+ environment.configure(config, getClass().getClassLoader());
+
+ environment
+ .fromSource(
+ new
MockSource(Boundedness.BOUNDED, 100),
+
WatermarkStrategy.noWatermarks(),
+ "bounded-source")
+ .print();
+
+ assertThat(
+ environment.getStreamGraph(),
+ hasProperties(
+
GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED,
+
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+ false));
+ }
+
+ @Test(expected = IllegalStateException.class)
Review comment:
Maybe using the `ExceptionException` rule would be better here because
it's more fine grained and allows matching on the exception message.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]