[ https://issues.apache.org/jira/browse/FLINK-22497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ChangjiGuo updated FLINK-22497: ------------------------------- Attachment: (was: 企业微信截图_43e36204-0a2f-4acd-ae17-56aa4d7661e4.png) > When using DefaultRollingPolicy in StreamingFileSink, the file will be > finished delayed > --------------------------------------------------------------------------------------- > > Key: FLINK-22497 > URL: https://issues.apache.org/jira/browse/FLINK-22497 > Project: Flink > Issue Type: Improvement > Components: Connectors / FileSystem > Affects Versions: 1.11.2 > Environment: hadoop-2.8.4 > Reporter: ChangjiGuo > Priority: Major > Attachments: 1.png > > > I had a doubt when testing StreamingFileSink: > The default 60s rolling interval in DefaultRollingPolicy is detected by > procTimeService. If the rolling interval is not met this time, it will be > delayed to the next timer trigger point (after 60s), so this is not > real-time. For example, if the checkpoint period is set to 60s, the file > should be converted to finished at the second checkpoint, but it will be > delayed to the third checkpoint. > You can refer to the attached picture for detail. > If we add rollingPolicy.shouldRollOnProcessingTime to the if condition of > Bucket.write method, the file will be set to finished in the second > checkpoint. > {code:java} > void write(IN element, long currentTime) throws IOException { > if (inProgressPart == null || > rollingPolicy.shouldRollOnEvent(inProgressPart, element) > ||rollingPolicy.shouldRollOnProcessingTime(inProgressPart, > currentTime)) { > if (LOG.isDebugEnabled()) { > LOG.info("Subtask {} closing in-progress part file for > bucket id={} due to element {}.", > subtaskIndex, bucketId, element); > } > rollPartFile(currentTime); > } > inProgressPart.write(element, currentTime); > } > {code} > > Is my understanding correct? > Thanks! ^_^ -- This message was sent by Atlassian Jira (v8.3.4#803005)