pnowojski commented on a change in pull request #9478: [FLINK-13766][task] 
Refactor the implementation of StreamInputProcessor based on 
StreamTaskInput#emitNext
URL: https://github.com/apache/flink/pull/9478#discussion_r318035534
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 ##########
 @@ -381,12 +333,28 @@ private ForwardingValveOutputHandler(
                        this.streamStatusMaintainer = 
checkNotNull(streamStatusMaintainer);
 
                        this.inputWatermarkGauge = inputWatermarkGauge;
+                       this.numRecordsIn = checkNotNull(numRecordsIn);
 
                        this.inputIndex = inputIndex;
                }
 
                @Override
-               public void handleWatermark(Watermark watermark) throws 
Exception {
+               @SuppressWarnings("unchecked")
+               public void emitRecord(StreamRecord record) throws Exception {
+                       synchronized (lock) {
+                               numRecordsIn.inc();
+                               if (inputIndex == 0) {
 
 Review comment:
   this might add some extra overhead compared to the previous code, as 
probably JIT won't be able to correctly handle this and branch prediction will 
not work (as far as I know, most/all optimisations are per class, not per class 
instance, so if there are two instances of the same class, where one has 
`inputIndex == 0` and other has `inputIndex == 1`, they will collide with one 
another).
   
   Maybe it would be better to have this class abstract, with abstract method 
for `processElement`, which would be implemented concretely in 
`FirstStreamTaskNetworkOutput` and `SecondStreamTaskNetworkOutput` as
   ```
                                        operator.setKeyContextElement1(record);
                                        operator.processElement1(record);
   ```
   and 
   ```
                                        operator.setKeyContextElement2(record);
                                        operator.processElement2(record);
   ```
   respectively? We could leave watermarks/statuses/latency markers as they 
are, since they are not performance critical.
   
   Normally I would suggest to relay on our benchmarks, but unfortunately our 
benchmarks for this selectable processor are very unstable :(

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to