rkhachatryan commented on a change in pull request #11098: [FLINK-16060][task]
Implement working StreamMultipleInputProcessor
URL: https://github.com/apache/flink/pull/11098#discussion_r384129362
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -484,11 +485,15 @@ public final void invoke() throws Exception {
}
}
+ protected boolean runMailboxStep() throws Exception {
+ return mailboxProcessor.runMailboxStep();
+ }
+
Review comment:
I came up with this weird construction:
```
public class TestStreamTask<OUT, OP extends StreamOperator<OUT>> extends
StreamTask<OUT, OP> {
private final StreamTask<OUT, OP> delegatee;
public TestStreamTask(StreamTask<OUT, OP> streamTask) {
super(streamTask.getEnvironment());
this.delegatee = streamTask;
}
public boolean runMailboxStep() throws Exception {
return delegate.mailboxProcessor.runMailboxStep();
}
// init()
}
```
Then replace type `StreamTask` to `TestStreamTask` and create it as:
```
env -> new TestStreamTask<>(new MultipleInputStreamTask<>(env))
```
(should be a factory)
I think this is better than having "test" methods in `StreamTask`.
Exposing `mailboxProcessor` (via constructor?) is also an option. But IMO
less preferable because it exposes more details.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services