[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15668572#comment-15668572
 ] 

ASF GitHub Bot commented on FLINK-5056:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2797#discussion_r88125887
  
    --- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 ---
    @@ -70,7 +71,37 @@
        private static org.apache.hadoop.fs.FileSystem dfs;
        private static String hdfsURI;
     
    -   private OneInputStreamOperatorTestHarness<String, Object> 
createTestSink(File dataDir) throws Exception {
    +   private static final String PENDING_SUFFIX = ".pending";
    +   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
    +   private static final String VALID_LENGTH_SUFFIX = ".valid";
    +
    +   private OneInputStreamOperatorTestHarness<String, Object> 
createRescalingTestSink(
    +           File outDir, int totalParallelism, int taskIdx, long 
inactivityInterval) throws Exception {
    +
    +           BucketingSink<String> sink = new 
BucketingSink<String>(outDir.getAbsolutePath())
    +                   .setBucketer(new Bucketer<String>() {
    +                           private static final long serialVersionUID = 1L;
    +
    +                           @Override
    +                           public Path getBucketPath(Clock clock, Path 
basePath, String element) {
    +                                   return new Path(basePath, element);
    +                           }
    +                   })
    +                   .setWriter(new StringWriter<String>())
    +                   .setInactiveBucketCheckInterval(inactivityInterval)
    +                   .setInactiveBucketThreshold(inactivityInterval)
    +                   .setPartPrefix("part")
    --- End diff --
    
    this should also be moved into a field since we use it in checkFS


> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-5056
>                 URL: https://issues.apache.org/jira/browse/FLINK-5056
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.1.3
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to