I’ve tried different windowing functions and all result in the same behavior.  
The one in the previous email used the global window and a processing time 
based repeated trigger.  The filename policy used real system time to timestamp 
the outgoing files.

Here are a couple other window+trigger combos I’ve tried (with window based 
filename strategies):

Window.<Message>into(FixedWindows.of(windowDur))
                .withAllowedLateness(Duration.standardHours(24))
                .discardingFiredPanes()
                .triggering(
                    Repeatedly.forever(
                        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
ie a fixed window with processing time based trigger + ~infinite lateness


    pipeline

        .apply(

            format("ReadSQS(%s)", options.getQueueUrl()),

            SqsIO.read().withQueueUrl(options.getQueueUrl()))

        .apply(WithTimestamps.of((Message m) -> Instant.now()))

        .apply(

            format("Window(%s)", options.getWindowDuration()),

            Window.into(FixedWindows.of(windowDur)))
ie map event time to processing time and use default trigger (ie close of 
window, no lateness)

These all resulted in the same behavior – data gets hung up in a temp file 
somewhere and the finalize file logic never seems to run.

Thanks,
-Preston

From: Reuven Lax <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, October 23, 2019 at 2:35 PM
To: dev <[email protected]>
Subject: Re: FileIO and windowed writes

What WindowFn are you using?

On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake 
<[email protected]<mailto:[email protected]>> wrote:
Hi guys,

I’m currently working on a simple system where the intention is to ingest data 
from a realtime stream – in this case amazon SQS – and write the output in an 
incremental fashion to a durable filesystem (ie S3).  It’s easy to think of 
this as a low-fi journaling system.  We need to make sure that data that’s 
written to the source queue eventually makes it to S3.  We are utilizing the 
FileIO windowed writes with a custom naming policy to partition the files by 
their event time.   Because SQS can’t guarantee order, we do have to allow late 
messages.  Moreover, we need a further guarantee that a message be written in a 
timely manner – we’re thinking some constant multiple of the windowing 
duration.  As a first pass, we were thinking a processing time based trigger 
that fires on some regular interval.  For context, here’s an example of the 
pipeline:

ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write 
to Avro

  pipeline
        .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
        .apply(
            Window.<Message>configure()
                .discardingFiredPanes()
                .triggering(
                    Repeatedly.forever(
                        
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
        .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
        .setCoder(AvroCoder.of(recordClass))
        .apply(
            AvroIO.write(recordClass)
                .withWindowedWrites()
                .withTempDirectory(options.getTempDir())
                .withNumShards(options.getShards())
                .to(new WindowedFilenamePolicy(options.getOutputPrefix(), 
"avro")));

This all seemed fairly straightforward.  I have not yet observed lost data with 
this pipeline, but I am seeing an issue with timeliness.  Things seem to get 
hung up on finalizing file output, but I have yet to truly pinpoint the issue.  
To really highlight the issue, I can setup a test where I send a single message 
to the source queue.  If nothing else happens, the data never makes it to its 
final output using the FlinkRunner (beam-2.15.0, flink-1.8).  Has anyone seen 
this behavior before?  Is the expectation of eventual consistency wrong?

Thanks,
-Preston




CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024<tel:(816)%20221-1024>.

Reply via email to