[ 
https://issues.apache.org/jira/browse/FLINK-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878113#comment-15878113
 ] 

ASF GitHub Bot commented on FLINK-5716:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3347#discussion_r102431062
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 ---
    @@ -247,45 +282,220 @@ public void onProcessingTime(long timestamp) {
         * Streaming topologies can use timestamp assigner functions to 
override the timestamps
         * assigned here.
         */
    -   private static class ManualWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
    +   private static class ManualWatermarkContext<T> extends 
WatermarkContext<T> {
     
    -           private final Object lock;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
     
    -           private ManualWatermarkContext(Object checkpointLock, 
Output<StreamRecord<T>> output) {
    -                   this.lock = Preconditions.checkNotNull(checkpointLock, 
"The checkpoint lock cannot be null.");
    +           private ManualWatermarkContext(
    +                           final Output<StreamRecord<T>> output,
    +                           final ProcessingTimeService timeService,
    +                           final Object checkpointLock,
    +                           final StreamStatusMaintainer 
streamStatusMaintainer,
    +                           final long idleTimeout) {
    +
    +                   super(timeService, checkpointLock, 
streamStatusMaintainer, idleTimeout);
    +
                        this.output = Preconditions.checkNotNull(output, "The 
output cannot be null.");
                        this.reuse = new StreamRecord<>(null);
                }
     
                @Override
    +           protected void processAndCollect(T element) {
    +                   output.collect(reuse.replace(element));
    +           }
    +
    +           @Override
    +           protected void processAndCollectWithTimestamp(T element, long 
timestamp) {
    +                   output.collect(reuse.replace(element, timestamp));
    +           }
    +
    +           @Override
    +           protected void processAndEmitWatermark(Watermark mark) {
    +                   output.emitWatermark(mark);
    +           }
    +
    +           @Override
    +           protected boolean allowWatermark(Watermark mark) {
    +                   return true;
    +           }
    +   }
    +
    +   /**
    +    * An asbtract {@link SourceFunction.SourceContext} that should be used 
as the base for
    --- End diff --
    
    Typo: `asbtract`


> Make streaming SourceContexts aware of source idleness
> ------------------------------------------------------
>
>                 Key: FLINK-5716
>                 URL: https://issues.apache.org/jira/browse/FLINK-5716
>             Project: Flink
>          Issue Type: Sub-task
>          Components: DataStream API, Streaming
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>
> This task comes after FLINK-5017, which adds the new element {{StreamStatus}} 
> to be incorporated with watermark progression logic.
> This task tracks the implementation of source idleness awareness and status 
> toggling in {{SourceFunction.SourceContext}}s.
> The source contexts should work on an "idle interval", where we determine the 
> containing {{SourceStreamTask}} to be idle if no new records / watermarks 
> have been collected in-between 2 continuous checks. (default value is 0. 
> Letting this value be user configurable is tracked by FLINK-5018).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to