pnowojski commented on a change in pull request #9478: [FLINK-13766][task]
Refactor the implementation of StreamInputProcessor based on
StreamTaskInput#emitNext
URL: https://github.com/apache/flink/pull/9478#discussion_r318035534
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
##########
@@ -381,12 +333,28 @@ private ForwardingValveOutputHandler(
this.streamStatusMaintainer =
checkNotNull(streamStatusMaintainer);
this.inputWatermarkGauge = inputWatermarkGauge;
+ this.numRecordsIn = checkNotNull(numRecordsIn);
this.inputIndex = inputIndex;
}
@Override
- public void handleWatermark(Watermark watermark) throws
Exception {
+ @SuppressWarnings("unchecked")
+ public void emitRecord(StreamRecord record) throws Exception {
+ synchronized (lock) {
+ numRecordsIn.inc();
+ if (inputIndex == 0) {
Review comment:
this might add some extra overhead compared to the previous code, as
probably JIT won't be able to correctly handle this and branch prediction will
not work (as far as I know, most/all optimisations are per class, not per class
instance, so if there are two instances of the same class, where one has
`inputIndex == 0` and other has `inputIndex == 1`, they will collide with one
another).
Maybe it would be better to have this class abstract, with abstract method
for `processElement`, which would be implemented concretely in
`FirstStreamTaskNetworkOutput` and `SecondStreamTaskNetworkOutput` as
```
operator.setKeyContextElement1(record);
operator.processElement1(record);
```
and
```
operator.setKeyContextElement2(record);
operator.processElement2(record);
```
respectively? We could leave watermarks/statuses/latency markers as they
are, since they are not performance critical.
Normally I would suggest to relay on our benchmarks, but unfortunately our
benchmarks for this selectable processor are very unstable :(
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services