dawidwys commented on a change in pull request #18702:
URL: https://github.com/apache/flink/pull/18702#discussion_r804523534
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -370,7 +411,17 @@ public DataInputStatus emitNext(DataOutput<OUT> output)
throws Exception {
private DataInputStatus emitNextNotReading(DataOutput<OUT> output) throws
Exception {
switch (operatingMode) {
case OUTPUT_NOT_INITIALIZED:
- currentMainOutput = eventTimeLogic.createMainOutput(output);
+ if (watermarkAlignmentParams.isEnabled()) {
+ // Only wrap the output when watermark alignment is
enabled, as otherwise this
+ // introduces a small performance regression (probably
because of an extra
+ // virtual call)
+ processingTimeService.scheduleWithFixedDelay(
+ this::emitLatestWatermark,
+ watermarkAlignmentParams.getUpdateInterval(),
+ watermarkAlignmentParams.getUpdateInterval());
+ }
+ currentMainOutput =
+ eventTimeLogic.createMainOutput(output,
this::onWatermarkEmitted);
Review comment:
Does it make sense to pass an empty method instead of
`onWatermarkEmitted` if watermark alignment is disabled? Right now we execute
some unnecessary logic on every watermark if the alignment is disabled.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/WatermarkToDataOutput.java
##########
@@ -35,12 +36,21 @@
public final class WatermarkToDataOutput implements WatermarkOutput {
private final PushingAsyncDataInput.DataOutput<?> output;
+ private final TimestampsAndWatermarks.OnWatermarkEmitted watermarkEmitted;
private long maxWatermarkSoFar;
private boolean isIdle;
- /** Creates a new WatermarkOutput against the given DataOutput. */
+ @VisibleForTesting
public WatermarkToDataOutput(PushingAsyncDataInput.DataOutput<?> output) {
+ this(output, watermark -> {});
+ }
+
+ /** Creates a new WatermarkOutput against the given DataOutput. */
+ public WatermarkToDataOutput(
+ PushingAsyncDataInput.DataOutput<?> output,
+ TimestampsAndWatermarks.OnWatermarkEmitted watermarkEmitted) {
this.output = checkNotNull(output);
+ this.watermarkEmitted = watermarkEmitted;
Review comment:
nit: for the sake of consistency: `checkNotNull`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]