pnowojski commented on a change in pull request #8858: [hotfix][tests] Change
some StreamTask tests to create a test task in the task's thread
URL: https://github.com/apache/flink/pull/8858#discussion_r297534345
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -1023,25 +1014,63 @@ protected AbstractStateBackend
createInnerBackend(Configuration config) {
//
------------------------------------------------------------------------
//
------------------------------------------------------------------------
+ private enum Event {
+ TASK_IS_RUNNING,
+ }
+
private static class EmptyStreamTask extends StreamTask<String,
AbstractStreamOperator<String>> {
- public EmptyStreamTask(Environment env) {
+ private boolean isRunningFoo;
+ private final LinkedBlockingQueue<Event> eventQueue;
+ private final OperatorChain<String,
AbstractStreamOperator<String>> overrideOperatorChain;
+ private volatile boolean sourceFinished;
+
+ EmptyStreamTask(Environment env, LinkedBlockingQueue<Event>
eventQueue, OperatorChain<String, AbstractStreamOperator<String>>
operatorChain) {
super(env, null);
+ this.eventQueue = eventQueue;
+ this.overrideOperatorChain = operatorChain;
Review comment:
Ok, I see it now. It's pretty fragile setup and it boils down to kind of
broken life cycle of the tasks and operators :( Of course it's still better
compared to what it used to be.
The proper solution would be to refactor the lifecycle, so that the classes
are initialised in the constructor, but that's beyond the scope. In that case
maybe:
1. either leave a comment in your `#init()` method that we are overriding an
`operatorChain` instance created in `StreamTask#invoke()`.
2. or maybe as a stop gap solution we could extract construction of an
`OperatorChain` in the `StreamTask` to some overloadable method
`StreamTask#createOperatorChain()`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services