[ 
https://issues.apache.org/jira/browse/FLINK-2406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14644117#comment-14644117
 ] 

ASF GitHub Bot commented on FLINK-2406:
---------------------------------------

Github user rmetzger commented on the pull request:

    https://github.com/apache/flink/pull/938#issuecomment-125520977
  
    I tried this pull request on a cluster, because the current code is failing 
with the following exception when running it with a checkpoint interval of 1 
second and a buffer timeout of 0 ms.
    ```
    01:08:41,472 ERROR 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask   - Flat Map 
(114/120) failed
    java.lang.RuntimeException: Error in barrier buffer logic
            at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.releaseBlocks(BarrierBuffer.java:192)
            at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:252)
            at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:163)
            at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:70)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
            at java.lang.Thread.run(Thread.java:745)
    01:08:41,472 INFO  org.apache.flink.runtime.taskmanager.Task                
     - Flat Map (114/120) switched to FAILED with exception.
    java.lang.RuntimeException: Error in barrier buffer logic
            at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.releaseBlocks(BarrierBuffer.java:192)
            at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:252)
            at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:163)
            at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:70)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
            at java.lang.Thread.run(Thread.java:745)
    ```
    
    The code from this PR is working, however, I got the following WARN 
messages, probably because the barriers are triggered too early (in the job 
lifecycle):
    ```
    08:59:24,775 WARN  org.apache.flink.streaming.runtime.io.BarrierBuffer      
     - Received checkpoint barrier for checkpoint 4 before completing current 
checkpoint 3. Skipping current checkpoint.
    08:59:24,785 WARN  org.apache.flink.streaming.runtime.io.BarrierBuffer      
     - Received checkpoint barrier for checkpoint 4 before completing current 
checkpoint 3. Skipping current checkpoint.
    08:59:24,786 WARN  org.apache.flink.streaming.runtime.io.BarrierBuffer      
     - Received checkpoint barrier for checkpoint 4 before completing current 
checkpoint 3. Skipping current checkpoint.
    08:59:24,787 WARN  org.apache.flink.streaming.runtime.io.BarrierBuffer      
     - Received checkpoint barrier for checkpoint 4 before completing current 
checkpoint 3. Skipping current checkpoint.
    08:59:25,223 WARN  org.apache.flink.streaming.runtime.io.BarrierBuffer      
     - Received checkpoint barrier for checkpoint 5 before completing current 
checkpoint 4. Skipping current checkpoint.
    08:59:25,225 WARN  org.apache.flink.streaming.runtime.io.BarrierBuffer      
     - Received checkpoint barrier for checkpoint 5 before completing current 
checkpoint 4. Skipping current checkpoint.
    08:59:25,226 WARN  org.apache.flink.streaming.runtime.io.BarrierBuffer      
     - Received checkpoint barrier for checkpoint 5 before completing current 
checkpoint 4. Skipping current checkpoint.
    08:59:25,229 WARN  org.apache.flink.streaming.runtime.io.BarrierBuffer      
     - Received checkpoint barrier for checkpoint 5 before completing current 
checkpoint 4. Skipping current checkpoint.
    ```


> Abstract BarrierBuffer to an exchangeable BarrierHandler
> --------------------------------------------------------
>
>                 Key: FLINK-2406
>                 URL: https://issues.apache.org/jira/browse/FLINK-2406
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming
>    Affects Versions: 0.10
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 0.10
>
>
> We need to make the Checkpoint handling pluggable, to allow us to use 
> different implementations:
>   - BarrierBuffer for "exactly once" processing. This inevitably introduces a 
> bit of latency.
>   - BarrierTracker for "at least once" processing, with no added latency.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to