dawidwys commented on a change in pull request #13853:
URL: https://github.com/apache/flink/pull/13853#discussion_r517412480
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
##########
@@ -83,12 +81,14 @@ public void open() throws Exception {
super.open();
timestampAssigner =
watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
- watermarkGenerator =
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);
+ watermarkGenerator = new NoOpWatermarkGenerator<>(
+
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup),
+ emitProgressiveWatermarks);
wmOutput = new WatermarkEmitter(output,
getContainingTask().getStreamStatusMaintainer());
watermarkInterval =
getExecutionConfig().getAutoWatermarkInterval();
- if (watermarkInterval > 0 && emitProgressiveWatermarks) {
+ if (watermarkInterval > 0) {
Review comment:
This check was valid as we do not want to register timers, if we don't
need to.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/eventtime/NoOpWatermarkGenerator.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.eventtime;
+
+import org.apache.flink.annotation.Public;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of a {@link WatermarkGenerator} that periodically emits
watermarks
+ * or generates only one final watermark at the end of input.
+ */
+@Public
+public final class NoOpWatermarkGenerator<E> implements WatermarkGenerator<E> {
Review comment:
I would make it a private class in the
`TimestampsAndWatermarksOperator`. Certainly it should not be a `@Public` class.
Moreover I thought we should rather have a class like:
```
/**
* An implementation of a {@link WatermarkGenerator} that does not emit
watermarks.
*/
public final class NoOpWatermarkGenerator<E> implements
WatermarkGenerator<E> {
@Override
public void onEvent(E event, long eventTimestamp, WatermarkOutput
output) {
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
}
```
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
##########
@@ -83,12 +81,14 @@ public void open() throws Exception {
super.open();
timestampAssigner =
watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
- watermarkGenerator =
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);
+ watermarkGenerator = new NoOpWatermarkGenerator<>(
Review comment:
Here I would do:
```
if (emitProgressiveWatermarks) {
watermarkGenerator =
watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);
} else {
watermarkGenerator = new NoOpWatermarkGenerator();
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]