[
https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ChangjiGuo updated FLINK-22497:
-------------------------------
Attachment: 企业微信截图_43e36204-0a2f-4acd-ae17-56aa4d7661e4.png
> When using DefaultRollingPolicy in StreamingFileSink, the file will be
> finished delayed
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-22497
> URL: https://issues.apache.org/jira/browse/FLINK-22497
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Affects Versions: 1.11.2
> Environment: hadoop-2.8.4
> Reporter: ChangjiGuo
> Priority: Major
> Attachments: 1.png
>
>
> I had a doubt when testing StreamingFileSink:
> The default 60s rolling interval in DefaultRollingPolicy is detected by
> procTimeService. If the rolling interval is not met this time, it will be
> delayed to the next timer trigger point (after 60s), so this is not
> real-time. For example, if the checkpoint period is set to 60s, the file
> should be converted to finished at the second checkpoint, but it will be
> delayed to the third checkpoint.
> You can refer to the attached picture for detail.
> If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of
> Bucket.write method, the file will be set to finished in the second
> checkpoint.
> {code:java}
> void write(IN element, long currentTime) throws IOException {
> if (inProgressPart == null ||
> rollingPolicy.shouldRollOnEvent(inProgressPart, element)
> ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart,
> currentTime)) {
> if (LOG.isDebugEnabled()) {
> LOG.info("Subtask {} closing in-progress part file for
> bucket id={} due to element {}.",
> subtaskIndex, bucketId, element);
> }
> rollPartFile(currentTime);
> }
> inProgressPart.write(element, currentTime);
> }
> {code}
>
> Is my understanding correct?
> Thanks! ^_^
--
This message was sent by Atlassian Jira
(v8.3.4#803005)