dawidwys commented on a change in pull request #18702:
URL: https://github.com/apache/flink/pull/18702#discussion_r804437340
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -529,4 +625,43 @@ public void forceStop() {
forcedStopFuture.complete(null);
}
}
+
+ /**
+ * TODO: this wrapper introduces a small performance regression when used.
Maybe we can avoid
+ * using it somehow?
+ */
+ private class WatermarkTrackingOutput<OUT> implements DataOutput<OUT> {
+ private final DataOutput<OUT> output;
+ private long lastWatermark;
Review comment:
What is the initial value for the `lastWatermark`? What happens if there
are no watermarks emitted by the reader?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -529,4 +625,43 @@ public void forceStop() {
forcedStopFuture.complete(null);
}
}
+
+ /**
+ * TODO: this wrapper introduces a small performance regression when used.
Maybe we can avoid
Review comment:
Unless we pass a callback in the `eventTimeLogic.createMainOutput`,
after all it's a private stack.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -325,8 +366,12 @@ private void stopInternalServices() {
public CompletableFuture<Void> stop(StopMode mode) {
switch (operatingMode) {
+ case WAITING_FOR_ALIGNMENT:
case OUTPUT_NOT_INITIALIZED:
case READING:
+ if (operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) {
+ waitingForAlignmentFuture.complete(null);
Review comment:
Don't know if better, but can't we instead combine the
`waitingForAlignmentFuture` with `finished` in `getAvailableFuture`?
Something like:
```
public CompletableFuture<?> getAvailableFuture() {
....
case WAITING_FOR_ALIGNMENT:
return availabilityHelper.update(waitingForAlignmentFuture);
case OUTPUT_NOT_INITIALIZED:
case READING:
return availabilityHelper.update(sourceReader.isAvailable());
```
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -529,4 +625,43 @@ public void forceStop() {
forcedStopFuture.complete(null);
}
}
+
+ /**
+ * TODO: this wrapper introduces a small performance regression when used.
Maybe we can avoid
Review comment:
Ok, I think it would not work, because we call `checkWatermarkAlignment`
in the `WatermarkTrackingOutput` :(
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]