Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5193#discussion_r159235730 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java --- @@ -42,22 +41,32 @@ public class SourceFunctionUtil { public static <T extends Serializable> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception { - final List<T> outputs = new ArrayList<T>(); - if (sourceFunction instanceof RichFunction) { + return runRichSourceFunction(sourceFunction); + } + else { + return runNonRichSourceFunction(sourceFunction); + } + } + private static <T extends Serializable> List<T> runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception { + try (MockEnvironment environment = new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024)) { AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class); when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig()); - RuntimeContext runtimeContext = new StreamingRuntimeContext( - operator, - new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), - new HashMap<String, Accumulator<?, ?>>()); - + RuntimeContext runtimeContext = new StreamingRuntimeContext( + operator, + environment, + new HashMap<>()); ((RichFunction) sourceFunction).setRuntimeContext(runtimeContext); - ((RichFunction) sourceFunction).open(new Configuration()); + + return runNonRichSourceFunction(sourceFunction); --- End diff -- It wouldn't suffice, because rich function's `MockEnvironment` must be closed. It would have to be something like: ``` Optional<MockEnvironment> context = Optional.empty(); try { if (sourceFunction instanceof RichFunction) { context = setupRichSourceFunction(sourceFunction); } return runSourceFunction(sourceFunction); } finally { context.ifPresent(MockEnvironment::close); } ``` Which in my opinion is uglier :(
---