Hi Kostas:

For simplicity FLINK-13027
<https://issues.apache.org/jira/browse/FLINK-13027> has been assigned to my
current user ID. I will contribute using that ID.

Will circulate with the community once we have initial success with this
new rolling policy !

Thank you again.

-
Ying


On Fri, Jun 28, 2019 at 9:51 AM Ying Xu <y...@lyft.com> wrote:

> Hi Kostas:
>
> I'd like to.  The account used to file the JIRA does not have contributor
> access yet .  I had contributed a few Flink JIRAs in the past, using a very
> similar but different account.  Now I would like to consolidate and use a
> common account for Apache projects contributions.
>
> Would you mind granting me the contributor access for the following
> account ?  This way I can assign the JIRA to myself.
>            *yxu-apache
> <https://issues.apache.org/jira/secure/ViewProfile.jspa?name=yxu-apache>*
>
> Many thanks!
> <http://www.lyft.com/>
> -
> Ying
>
>
> On Fri, Jun 28, 2019 at 2:23 AM Kostas Kloudas <kklou...@gmail.com> wrote:
>
>> Hi Ying,
>>
>> That sounds great!
>> Looking forward to your PR!
>>
>> Btw don't you want to assign the issue to yourself if you are
>> planning to work on it?
>>
>> Kostas
>>
>> On Fri, Jun 28, 2019 at 9:54 AM Ying Xu <y...@lyft.com.invalid> wrote:
>>
>> > Thanks Kostas for confirming!
>> >
>> > I've filed a issue FLINK-13027
>> > <https://issues.apache.org/jira/browse/FLINK-13027> .   We are actively
>> > working on the interface of such a file rolling policy, and will also
>> > perform benchmarks when it is integrated with a StreamingFileSink. We
>> are
>> > more than happy to contribute if there's no other plan to address this
>> > issue.
>> >
>> > Thanks again.
>> >
>> > -
>> > Bests
>> > Ying
>> >
>> >
>> > On Tue, Jun 25, 2019 at 2:24 AM Kostas Kloudas <kklou...@gmail.com>
>> wrote:
>> >
>> > > Hi Ying,
>> > >
>> > > You are right! If it is either on checkpoint or on size, then this is
>> > > doable even with the current state of things.
>> > > Could you open a JIRA so that we can keep track of the progress?
>> > >
>> > > Cheers,
>> > > Kostas
>> > >
>> > > On Tue, Jun 25, 2019 at 9:49 AM Ying Xu <y...@lyft.com.invalid> wrote:
>> > >
>> > > > HI Kostas:
>> > > >
>> > > > Thanks for the prompt reply.
>> > > >
>> > > > The file rolling policy mentioned previously is meant to roll files
>> > > EITHER
>> > > > when a size limited is reached, OR when a checkpoint happens.  Looks
>> > like
>> > > > every time a file is rolled, the part file is closed
>> > > > <
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L217-L218
>> > > > >,
>> > > > during which file is closed with a committable returned
>> > > > <
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L239-L240
>> > > > >.
>> > > > I assume it is during closeForCommit() when the Parquet file
>> metatdata
>> > is
>> > > > written.  At a first glance, the code path of file rolling looks
>> very
>> > > > similar to that inside prepareBucketForCheckpointing()
>> > > > <
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/3702029f45b7034b767e2b7eb01601c7f76ab35e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L275
>> > > > >.
>> > > > Not sure if I miss anything there.
>> > > >
>> > > >
>> > > > -
>> > > > Ying
>> > > >
>> > > >
>> > > > On Mon, Jun 24, 2019 at 2:01 AM Kostas Kloudas <kklou...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > Hi Ying,
>> > > > >
>> > > > > Thanks for using the StreamingFileSink.
>> > > > >
>> > > > > The reason why the StreamingFileSink only supports
>> > > > > OnCheckpointRollingPolicy with bulk
>> > > > > formats has to do with the fact that currently Flink relies on the
>> > > Hadoop
>> > > > > writer for Parquet.
>> > > > >
>> > > > > Bulk formats keep important details about how they write the
>> actual
>> > > data
>> > > > > (such as compression
>> > > > > schemes, offsets, etc) in metadata and they write this metadata
>> with
>> > > the
>> > > > > file (e.g. parquet writes
>> > > > > them as a footer). The hadoop writer gives no access to these
>> > metadata.
>> > > > > Given this, there is
>> > > > > no way for flink to be able to checkpoint a part file securely
>> > without
>> > > > > closing it.
>> > > > >
>> > > > > The solution would be to write our own writer and not go through
>> the
>> > > > hadoop
>> > > > > one, but there
>> > > > > are no concrete plans for this, as far as I know.
>> > > > >
>> > > > > I hope this explains a bit more why the StreamingFileSink has this
>> > > > > limitation.
>> > > > >
>> > > > > Cheers,
>> > > > > Kostas
>> > > > >
>> > > > >
>> > > > > On Mon, Jun 24, 2019 at 9:19 AM Ying Xu <y...@lyft.com.invalid>
>> > wrote:
>> > > > >
>> > > > > > Dear Flink community:
>> > > > > >
>> > > > > > We have a use case where StreamingFileSink
>> > > > > > <
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>> > > > > > >
>> > > > > > is used for persisting bulk-encoded data to AWS s3. In our case,
>> > the
>> > > > data
>> > > > > > sources consist of hybrid types of events, for which each type
>> is
>> > > > > uploaded
>> > > > > > to an individual s3 prefix location. Because the event size is
>> > highly
>> > > > > > skewed, the uploaded file size may differ dramatically.  In
>> order
>> > to
>> > > > > have a
>> > > > > > better control over the uploaded file size, we would like to
>> adopt
>> > a
>> > > > > > rolling policy based on file sizes (e.g., roll the file every
>> > 100MB).
>> > > > Yet
>> > > > > > it appears bulk-encoding StreamingFileSink only supports
>> > > > checkpoint-based
>> > > > > > file rolling.
>> > > > > >
>> > > > > > IMPORTANT: Bulk-encoding formats can only be combined with the
>> > > > > > `OnCheckpointRollingPolicy`, which rolls the in-progress part
>> file
>> > on
>> > > > > every
>> > > > > > checkpoint.
>> > > > > >
>> > > > > > Checkpoint-based file rolling appears to have other side
>> effects.
>> > For
>> > > > > > instance, quite a lot of the heavy liftings (e.g file parts
>> > > uploading)
>> > > > > are
>> > > > > > performed at the checkpointing time. As a result, checkpointing
>> > takes
>> > > > > > longer duration when data volume is high.
>> > > > > >
>> > > > > > Having a customized file rolling policy can be achieved by small
>> > > > > > adjustments on the BulkFormatBuilder interface in
>> > StreamingFileSink.
>> > > In
>> > > > > the
>> > > > > > case of using S3RecoverableWriter, file rolling triggers data
>> > > uploading
>> > > > > and
>> > > > > > corresponding S3Committer is also constructed and stored. Hence
>> on
>> > > the
>> > > > > > surface, adding a simple file-size based rolling policy would
>> NOT
>> > > > > > compromise the established exact-once guarantee.
>> > > > > >
>> > > > > > Any advises on whether the above idea makes sense? Or perhaps
>> there
>> > > are
>> > > > > > pitfalls that one might pay attention when introducing such
>> rolling
>> > > > > policy.
>> > > > > > Thanks a lot!
>> > > > > >
>> > > > > >
>> > > > > > -
>> > > > > > Ying
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to