aljoscha commented on a change in pull request #13656:
URL: https://github.com/apache/flink/pull/13656#discussion_r506211458



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorExecutionModeDetectionTest.java
##########
@@ -19,37 +19,129 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.streaming.api.RuntimeExecutionMode;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 
 /**
  * Tests for the detection of the {@link RuntimeExecutionMode runtime 
execution mode} during
  * stream graph translation.
  */
 public class StreamGraphGeneratorExecutionModeDetectionTest extends TestLogger 
{
 
+       @Test
+       public void 
testExecutionModePropagationFromEnvWithDefaultAndBoundedSource() {
+               final StreamExecutionEnvironment environment =
+                               
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               environment
+                               .fromSource(
+                                               new 
MockSource(Boundedness.BOUNDED, 100),
+                                               
WatermarkStrategy.noWatermarks(),
+                                               "bounded-source")
+                               .print();
+
+               assertThat(
+                               environment.getStreamGraph(),
+                               hasProperties(
+                                               
GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
+                                               ScheduleMode.EAGER,
+                                               true));
+       }
+
+       @Test
+       public void 
testExecutionModePropagationFromEnvWithDefaultAndUnboundedSource() {
+               final StreamExecutionEnvironment environment =
+                               
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               environment
+                               .fromSource(
+                                               new 
MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 100),
+                                               
WatermarkStrategy.noWatermarks(),
+                                               "unbounded-source")
+                               .print();
+
+               assertThat(
+                               environment.getStreamGraph(),
+                               hasProperties(
+                                               
GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
+                                               ScheduleMode.EAGER,
+                                               true));
+       }
+
+       @Test
+       public void 
testExecutionModePropagationFromEnvWithAutomaticAndBoundedSource() {
+               final Configuration config = new Configuration();
+               config.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.AUTOMATIC);
+
+               final StreamExecutionEnvironment environment =
+                               
StreamExecutionEnvironment.getExecutionEnvironment();
+               environment.configure(config, getClass().getClassLoader());
+
+               environment
+                               .fromSource(
+                                               new 
MockSource(Boundedness.BOUNDED, 100),
+                                               
WatermarkStrategy.noWatermarks(),
+                                               "bounded-source")
+                               .print();
+
+               assertThat(
+                               environment.getStreamGraph(),
+                               hasProperties(
+                                               
GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED,
+                                               
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+                                               false));
+       }
+
+       @Test(expected = IllegalStateException.class)

Review comment:
       Maybe using the `ExceptionException` rule would be better here because 
it's more fine grained and allows matching on the exception message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to