[
https://issues.apache.org/jira/browse/FLINK-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878113#comment-15878113
]
ASF GitHub Bot commented on FLINK-5716:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/3347#discussion_r102431062
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
---
@@ -247,45 +282,220 @@ public void onProcessingTime(long timestamp) {
* Streaming topologies can use timestamp assigner functions to
override the timestamps
* assigned here.
*/
- private static class ManualWatermarkContext<T> implements
SourceFunction.SourceContext<T> {
+ private static class ManualWatermarkContext<T> extends
WatermarkContext<T> {
- private final Object lock;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
- private ManualWatermarkContext(Object checkpointLock,
Output<StreamRecord<T>> output) {
- this.lock = Preconditions.checkNotNull(checkpointLock,
"The checkpoint lock cannot be null.");
+ private ManualWatermarkContext(
+ final Output<StreamRecord<T>> output,
+ final ProcessingTimeService timeService,
+ final Object checkpointLock,
+ final StreamStatusMaintainer
streamStatusMaintainer,
+ final long idleTimeout) {
+
+ super(timeService, checkpointLock,
streamStatusMaintainer, idleTimeout);
+
this.output = Preconditions.checkNotNull(output, "The
output cannot be null.");
this.reuse = new StreamRecord<>(null);
}
@Override
+ protected void processAndCollect(T element) {
+ output.collect(reuse.replace(element));
+ }
+
+ @Override
+ protected void processAndCollectWithTimestamp(T element, long
timestamp) {
+ output.collect(reuse.replace(element, timestamp));
+ }
+
+ @Override
+ protected void processAndEmitWatermark(Watermark mark) {
+ output.emitWatermark(mark);
+ }
+
+ @Override
+ protected boolean allowWatermark(Watermark mark) {
+ return true;
+ }
+ }
+
+ /**
+ * An asbtract {@link SourceFunction.SourceContext} that should be used
as the base for
--- End diff --
Typo: `asbtract`
> Make streaming SourceContexts aware of source idleness
> ------------------------------------------------------
>
> Key: FLINK-5716
> URL: https://issues.apache.org/jira/browse/FLINK-5716
> Project: Flink
> Issue Type: Sub-task
> Components: DataStream API, Streaming
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
>
> This task comes after FLINK-5017, which adds the new element {{StreamStatus}}
> to be incorporated with watermark progression logic.
> This task tracks the implementation of source idleness awareness and status
> toggling in {{SourceFunction.SourceContext}}s.
> The source contexts should work on an "idle interval", where we determine the
> containing {{SourceStreamTask}} to be idle if no new records / watermarks
> have been collected in-between 2 continuous checks. (default value is 0.
> Letting this value be user configurable is tracked by FLINK-5018).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)