Is this only in the Flink runner?

On Wed, Oct 23, 2019 at 2:12 PM Koprivica,Preston Blake <
[email protected]> wrote:

> I’ve tried different windowing functions and all result in the same
> behavior.  The one in the previous email used the global window and a
> processing time based repeated trigger.  The filename policy used real
> system time to timestamp the outgoing files.
>
>
>
> Here are a couple other window+trigger combos I’ve tried (with window
> based filename strategies):
>
>
>
> Window.<Message>into(FixedWindows.of(windowDur))
>
>                 .withAllowedLateness(Duration.standardHours(24))
>
>                 .discardingFiredPanes()
>
>                 .triggering(
>
>                     Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
>
> ie a fixed window with processing time based trigger + ~infinite lateness
>
>
>
>     pipeline
>
>         .apply(
>
>             format("ReadSQS(%s)", options.getQueueUrl()),
>
>             SqsIO.read().withQueueUrl(options.getQueueUrl()))
>
>         .apply(WithTimestamps.of((Message m) -> Instant.now()))
>
>         .apply(
>
>             format("Window(%s)", options.getWindowDuration()),
>
>             Window.into(FixedWindows.of(windowDur)))
>
> ie map event time to processing time and use default trigger (ie close of
> window, no lateness)
>
>
>
> These all resulted in the same behavior – data gets hung up in a temp file
> somewhere and the finalize file logic never seems to run.
>
>
>
> Thanks,
>
> -Preston
>
>
>
> *From: *Reuven Lax <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Wednesday, October 23, 2019 at 2:35 PM
> *To: *dev <[email protected]>
> *Subject: *Re: FileIO and windowed writes
>
>
>
> What WindowFn are you using?
>
>
>
> On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake <
> [email protected]> wrote:
>
> Hi guys,
>
>
>
> I’m currently working on a simple system where the intention is to ingest
> data from a realtime stream – in this case amazon SQS – and write the
> output in an incremental fashion to a durable filesystem (ie S3).  It’s
> easy to think of this as a low-fi journaling system.  We need to make sure
> that data that’s written to the source queue eventually makes it to S3.  We
> are utilizing the FileIO windowed writes with a custom naming policy to
> partition the files by their event time.   Because SQS can’t guarantee
> order, we do have to allow late messages.  Moreover, we need a further
> guarantee that a message be written in a timely manner – we’re thinking
> some constant multiple of the windowing duration.  As a first pass, we were
> thinking a processing time based trigger that fires on some regular
> interval.  For context, here’s an example of the pipeline:
>
>
>
> ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message ->
> Write to Avro
>
>
>
>   pipeline
>
>         .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
>
>         .apply(
>
>             Window.<Message>configure()
>
>                 .discardingFiredPanes()
>
>                 .triggering(
>
>                     Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
>
>         .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
>
>         .setCoder(AvroCoder.of(recordClass))
>
>         .apply(
>
>             AvroIO.write(recordClass)
>
>                 .withWindowedWrites()
>
>                 .withTempDirectory(options.getTempDir())
>
>                 .withNumShards(options.getShards())
>
>                 .to(new WindowedFilenamePolicy(options.getOutputPrefix(),
> "avro")));
>
>
>
> This all seemed fairly straightforward.  I have not yet observed lost data
> with this pipeline, but I am seeing an issue with timeliness.  Things seem
> to get hung up on finalizing file output, but I have yet to truly pinpoint
> the issue.  To really highlight the issue, I can setup a test where I send
> a single message to the source queue.  If nothing else happens, the data
> never makes it to its final output using the FlinkRunner (beam-2.15.0,
> flink-1.8).  Has anyone seen this behavior before?  Is the expectation of
> eventual consistency wrong?
>
>
>
> Thanks,
>
> -Preston
>
>
>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>.
>
>

Reply via email to