zhijiangW commented on a change in pull request #9483: [FLINK-13767][task] 
Migrate isFinished method from AvailabilityListener to AsyncDataInput
URL: https://github.com/apache/flink/pull/9483#discussion_r321050088
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 ##########
 @@ -121,18 +115,46 @@ public StreamTwoInputProcessor(
                        taskManagerConfig,
                        taskName);
                checkState(checkpointedInputGates.length == 2);
-               this.input1 = new 
StreamTaskNetworkInput(checkpointedInputGates[0], inputSerializer1, ioManager, 
0);
-               this.input2 = new 
StreamTaskNetworkInput(checkpointedInputGates[1], inputSerializer2, ioManager, 
1);
 
-               this.statusWatermarkValve1 = new StatusWatermarkValve(
-                       unionedInputGate1.getNumberOfInputChannels(),
-                       new ForwardingValveOutputHandler(streamOperator, lock, 
streamStatusMaintainer, input1WatermarkGauge, 0));
-               this.statusWatermarkValve2 = new StatusWatermarkValve(
-                       unionedInputGate2.getNumberOfInputChannels(),
-                       new ForwardingValveOutputHandler(streamOperator, lock, 
streamStatusMaintainer, input2WatermarkGauge, 1));
+               this.output1 = new StreamTaskNetworkOutput<>(
+                       streamOperator,
+                       (StreamRecord<IN1> record) -> {
 
 Review comment:
   Agree, the constructor seems too long now. In order to avoid this, I ever 
constructed the relevant arguments including `Output` via separate method in 
`StreamTwoInputTask`, and then pass these final arguments to the constructor of 
`StreamTwoInputProcessor` directly in my previous version. I would consider 
refactor this part later.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to