[ https://issues.apache.org/jira/browse/FLINK-4190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15374589#comment-15374589 ]
Josh Forman-Gornall commented on FLINK-4190: -------------------------------------------- Ok I'll do that. Can you see any issues with the code itself? The main thing I wasn't sure about was whether the inactive buckets check should occur in a separate thread spawned by a timer/scheduled executor, or whether it should occur in the {{invoke}} method, when new elements arrive. I decided to do the latter since I thought it might be better running on the main operator thread. But it has the disadvantage that if no new elements arrive, the inactive buckets would not be closed (although I guess this is unlikely to happen in practice). > Generalise RollingSink to work with arbitrary buckets > ----------------------------------------------------- > > Key: FLINK-4190 > URL: https://issues.apache.org/jira/browse/FLINK-4190 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector, Streaming Connectors > Reporter: Josh Forman-Gornall > Assignee: Josh Forman-Gornall > Priority: Minor > > The current RollingSink implementation appears to be intended for writing to > directories that are bucketed by system time (e.g. minutely) and to only be > writing to one file within one bucket at any point in time. When the system > time determines that the current bucket should be changed, the current bucket > and file are closed and a new bucket and file are created. The sink cannot be > used for the more general problem of writing to arbitrary buckets, perhaps > determined by an attribute on the element/tuple being processed. > There are three limitations which prevent the existing sink from being used > for more general problems: > - Only bucketing by the current system time is supported, and not by e.g. an > attribute of the element being processed by the sink. > - Whenever the sink sees a change in the bucket being written to, it flushes > the file and moves on to the new bucket. Therefore the sink cannot have more > than one bucket/file open at a time. Additionally the checkpointing mechanics > only support saving the state of one active bucket and file. > - The sink determines that it should 'close' an active bucket and file when > the bucket path changes. We need another way to determine when a bucket has > become inactive and needs to be closed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)