Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/5239#discussion_r160967520
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
---
@@ -132,32 +141,49 @@ public AbstractStreamOperatorTestHarness(
1024,
new Configuration(),
new ExecutionConfig(),
+ new TestTaskStateManager(),
maxParallelism,
parallelism,
subtaskIndex));
}
public AbstractStreamOperatorTestHarness(
StreamOperator<OUT> operator,
- final Environment environment) throws Exception {
+ Environment env) throws Exception {
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<>();
this.sideOutputLists = new HashMap<>();
- Configuration underlyingConfig =
environment.getTaskConfiguration();
+ Configuration underlyingConfig = env.getTaskConfiguration();
this.config = new StreamConfig(underlyingConfig);
this.config.setCheckpointingEnabled(true);
this.config.setOperatorID(new OperatorID());
- this.executionConfig = environment.getExecutionConfig();
+ this.executionConfig = env.getExecutionConfig();
this.closableRegistry = new CloseableRegistry();
this.checkpointLock = new Object();
- this.environment = Preconditions.checkNotNull(environment);
+ Preconditions.checkNotNull(env);
+
+ MockUtil mockUtil = new MockUtil();
+
+ if (!mockUtil.isMock(env) && !mockUtil.isSpy(env)) {
+ env = spy(env);
+ }
+
+ this.environment = env;
+
+ this.taskStateManager = new TestTaskStateManager(
+ env.getJobID(),
+ env.getExecutionId());
+
+
when(this.environment.getTaskStateManager()).thenReturn(this.taskStateManager);
--- End diff --
Imagine someone in his test creating harness in the following way:
```
stateManager = new MyFancyTaskStateManager();
new AbstractStreamOperatorTestHarness(
operator,
new MockEnvironment(
(...),
stateManager);
```
and suddenly this line is replacing his `MyFancyTaskStateManager` in some
of the usages (but maybe not all of them). Where `MyFancyTestStateManager`
means anything that is not the same instance with the same behaviour as the one
which you return here (which kind of defeats the purpose, since either this
field is already exactly what you want it to be, or something that shouldn't
even exist).
Rephrasing this problem, what this line is doing right here, is using a
reflection calls to override `MockEnvironment`'s `private final` field
(`private final TaskStateManager taskStateManager;`), overriding it's original
value.
Maybe this problem could also go away if you split the `TaskStateManager`
as I mentioned earlier.
If not, please refactor this code/users of this constructor, so that they
pass correctly setup'ed `TaskStateManager`. Also please note, that 99% of this
constructor are using `MockEnvironment`, which was passed to this method with
the intention of test harness taking over the ownership of the
`MockEnvironment`. In other words, I would bet that all of those problems would
go away, if `MockEnvironment` was created locally, instead of being injected.
The only exception to this 99% are three calls in `AsyncWaitOperatorTest`,
which actually should be quite easy refactored to follow up the same code path
and to use `MockEnvironment`.
---