Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3347#discussion_r102431062
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
---
@@ -247,45 +282,220 @@ public void onProcessingTime(long timestamp) {
* Streaming topologies can use timestamp assigner functions to
override the timestamps
* assigned here.
*/
- private static class ManualWatermarkContext<T> implements
SourceFunction.SourceContext<T> {
+ private static class ManualWatermarkContext<T> extends
WatermarkContext<T> {
- private final Object lock;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
- private ManualWatermarkContext(Object checkpointLock,
Output<StreamRecord<T>> output) {
- this.lock = Preconditions.checkNotNull(checkpointLock,
"The checkpoint lock cannot be null.");
+ private ManualWatermarkContext(
+ final Output<StreamRecord<T>> output,
+ final ProcessingTimeService timeService,
+ final Object checkpointLock,
+ final StreamStatusMaintainer
streamStatusMaintainer,
+ final long idleTimeout) {
+
+ super(timeService, checkpointLock,
streamStatusMaintainer, idleTimeout);
+
this.output = Preconditions.checkNotNull(output, "The
output cannot be null.");
this.reuse = new StreamRecord<>(null);
}
@Override
+ protected void processAndCollect(T element) {
+ output.collect(reuse.replace(element));
+ }
+
+ @Override
+ protected void processAndCollectWithTimestamp(T element, long
timestamp) {
+ output.collect(reuse.replace(element, timestamp));
+ }
+
+ @Override
+ protected void processAndEmitWatermark(Watermark mark) {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ protected boolean allowWatermark(Watermark mark) {
+ return true;
+ }
+ }
+
+ /**
+ * An asbtract {@link SourceFunction.SourceContext} that should be used
as the base for
--- End diff --
Typo: `asbtract`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---