zhijiangW commented on a change in pull request #9483: [FLINK-13767][task]
Migrate isFinished method from AvailabilityListener to AsyncDataInput
URL: https://github.com/apache/flink/pull/9483#discussion_r321050088
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
##########
@@ -121,18 +115,46 @@ public StreamTwoInputProcessor(
taskManagerConfig,
taskName);
checkState(checkpointedInputGates.length == 2);
- this.input1 = new
StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager,
0);
- this.input2 = new
StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager,
1);
- this.statusWatermarkValve1 = new StatusWatermarkValve(
- unionedInputGate1.getNumberOfInputChannels(),
- new ForwardingValveOutputHandler(streamOperator, lock,
streamStatusMaintainer, input1WatermarkGauge, 0));
- this.statusWatermarkValve2 = new StatusWatermarkValve(
- unionedInputGate2.getNumberOfInputChannels(),
- new ForwardingValveOutputHandler(streamOperator, lock,
streamStatusMaintainer, input2WatermarkGauge, 1));
+ this.output1 = new StreamTaskNetworkOutput<>(
+ streamOperator,
+ (StreamRecord<IN1> record) -> {
Review comment:
Agree, the constructor seems too long now. In order to avoid this, I ever
constructed the relevant arguments including `Output` via separate method in
`StreamTwoInputTask`, and then pass these final arguments to the constructor of
`StreamTwoInputProcessor` directly in my previous version. I would consider
refactor this part later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services