Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/5193#discussion_r159723523
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
---
@@ -42,22 +41,32 @@
public class SourceFunctionUtil {
public static <T extends Serializable> List<T>
runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
- final List<T> outputs = new ArrayList<T>();
-
if (sourceFunction instanceof RichFunction) {
+ return runRichSourceFunction(sourceFunction);
+ }
+ else {
+ return runNonRichSourceFunction(sourceFunction);
+ }
+ }
+ private static <T extends Serializable> List<T>
runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
+ try (MockEnvironment environment = new
MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(),
1024)) {
AbstractStreamOperator<?> operator =
mock(AbstractStreamOperator.class);
when(operator.getExecutionConfig()).thenReturn(new
ExecutionConfig());
- RuntimeContext runtimeContext = new
StreamingRuntimeContext(
- operator,
- new MockEnvironment("MockTask", 3 *
1024 * 1024, new MockInputSplitProvider(), 1024),
- new HashMap<String, Accumulator<?,
?>>());
-
+ RuntimeContext runtimeContext = new
StreamingRuntimeContext(
+ operator,
+ environment,
+ new HashMap<>());
((RichFunction)
sourceFunction).setRuntimeContext(runtimeContext);
-
((RichFunction) sourceFunction).open(new
Configuration());
+
+ return runNonRichSourceFunction(sourceFunction);
--- End diff --
Ah, I see. Thanks for the clarification, I didn't pay enough attention here.
Yes, it wouldn't make sense to refactor this any more if the environment
needs to be closed.
---