[ 
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ChangjiGuo updated FLINK-22497:
-------------------------------
    Attachment: 1.png

> When using DefaultRollingPolicy in StreamingFileSink, the file will be 
> finished delayed
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-22497
>                 URL: https://issues.apache.org/jira/browse/FLINK-22497
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>    Affects Versions: 1.11.2
>         Environment: hadoop-2.8.4
>            Reporter: ChangjiGuo
>            Priority: Major
>         Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by 
> procTimeService. If the rolling interval is not met this time, it will be 
> delayed to the next timer trigger point (after 60s), so this is not 
> real-time. For example, if the checkpoint period is set to 60s, the file 
> should be converted to finished at the second checkpoint, but it will be 
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of 
> Bucket.write method, the file will be set to finished in the second 
> checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
>     if (inProgressPart == null || 
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
>               ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, 
> currentTime)) {
>               if (LOG.isDebugEnabled()) {
>                       LOG.info("Subtask {} closing in-progress part file for 
> bucket id={} due to element {}.",
>                                       subtaskIndex, bucketId, element);
>               }
>               rollPartFile(currentTime);
>       }
>       inProgressPart.write(element, currentTime);
> }
> {code}
>  
> Is my understanding correct? 
> Thanks! ^_^



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to