pnowojski commented on a change in pull request #18702:
URL: https://github.com/apache/flink/pull/18702#discussion_r804579899
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -370,7 +411,17 @@ public DataInputStatus emitNext(DataOutput<OUT> output)
throws Exception {
private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws
Exception {
switch (operatingMode) {
case OUTPUT_NOT_INITIALIZED:
- currentMainOutput = eventTimeLogic.createMainOutput(output);
+ if (watermarkAlignmentParams.isEnabled()) {
+ // Only wrap the output when watermark alignment is
enabled, as otherwise this
+ // introduces a small performance regression (probably
because of an extra
+ // virtual call)
+ processingTimeService.scheduleWithFixedDelay(
+ this::emitLatestWatermark,
+ watermarkAlignmentParams.getUpdateInterval(),
+ watermarkAlignmentParams.getUpdateInterval());
+ }
+ currentMainOutput =
+ eventTimeLogic.createMainOutput(output,
this::onWatermarkEmitted);
Review comment:
Hard to make a call. It would be more descriptive to pass an empty
callback, but it would be more complicated and `lastEmittedWatermark` would be
not as consistent. As i can not decide which is better I would prefer leave it
as it is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]