Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5193#discussion_r159235730
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
---
@@ -42,22 +41,32 @@
public class SourceFunctionUtil {
public static <T extends Serializable> List<T>
runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
- final List<T> outputs = new ArrayList<T>();
-
if (sourceFunction instanceof RichFunction) {
+ return runRichSourceFunction(sourceFunction);
+ }
+ else {
+ return runNonRichSourceFunction(sourceFunction);
+ }
+ }
+ private static <T extends Serializable> List<T>
runRichSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
+ try (MockEnvironment environment = new
MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(),
1024)) {
AbstractStreamOperator<?> operator =
mock(AbstractStreamOperator.class);
when(operator.getExecutionConfig()).thenReturn(new
ExecutionConfig());
- RuntimeContext runtimeContext = new
StreamingRuntimeContext(
- operator,
- new MockEnvironment("MockTask", 3 *
1024 * 1024, new MockInputSplitProvider(), 1024),
- new HashMap<String, Accumulator<?,
?>>());
-
+ RuntimeContext runtimeContext = new
StreamingRuntimeContext(
+ operator,
+ environment,
+ new HashMap<>());
((RichFunction)
sourceFunction).setRuntimeContext(runtimeContext);
-
((RichFunction) sourceFunction).open(new
Configuration());
+
+ return runNonRichSourceFunction(sourceFunction);
--- End diff --
It wouldn't suffice, because rich function's `MockEnvironment` must be
closed. It would have to be something like:
```
Optional<MockEnvironment> context = Optional.empty();
try {
if (sourceFunction instanceof RichFunction) {
context = setupRichSourceFunction(sourceFunction);
}
return runSourceFunction(sourceFunction);
}
finally {
context.ifPresent(MockEnvironment::close);
}
```
Which in my opinion is uglier :(
---