I currently only have quick access to test the DirectRunner and the FlinkRunner. This only manifests in the FlinkRunner.
From: Reuven Lax <[email protected]> Reply-To: "[email protected]" <[email protected]> Date: Wednesday, October 23, 2019 at 4:14 PM To: dev <[email protected]> Subject: Re: FileIO and windowed writes Is this only in the Flink runner? On Wed, Oct 23, 2019 at 2:12 PM Koprivica,Preston Blake <[email protected]<mailto:[email protected]>> wrote: I’ve tried different windowing functions and all result in the same behavior. The one in the previous email used the global window and a processing time based repeated trigger. The filename policy used real system time to timestamp the outgoing files. Here are a couple other window+trigger combos I’ve tried (with window based filename strategies): Window.<Message>into(FixedWindows.of(windowDur)) .withAllowedLateness(Duration.standardHours(24)) .discardingFiredPanes() .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur)))) ie a fixed window with processing time based trigger + ~infinite lateness pipeline .apply( format("ReadSQS(%s)", options.getQueueUrl()), SqsIO.read().withQueueUrl(options.getQueueUrl())) .apply(WithTimestamps.of((Message m) -> Instant.now())) .apply( format("Window(%s)", options.getWindowDuration()), Window.into(FixedWindows.of(windowDur))) ie map event time to processing time and use default trigger (ie close of window, no lateness) These all resulted in the same behavior – data gets hung up in a temp file somewhere and the finalize file logic never seems to run. Thanks, -Preston From: Reuven Lax <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: Wednesday, October 23, 2019 at 2:35 PM To: dev <[email protected]<mailto:[email protected]>> Subject: Re: FileIO and windowed writes What WindowFn are you using? On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake <[email protected]<mailto:[email protected]>> wrote: Hi guys, I’m currently working on a simple system where the intention is to ingest data from a realtime stream – in this case amazon SQS – and write the output in an incremental fashion to a durable filesystem (ie S3). It’s easy to think of this as a low-fi journaling system. We need to make sure that data that’s written to the source queue eventually makes it to S3. We are utilizing the FileIO windowed writes with a custom naming policy to partition the files by their event time. Because SQS can’t guarantee order, we do have to allow late messages. Moreover, we need a further guarantee that a message be written in a timely manner – we’re thinking some constant multiple of the windowing duration. As a first pass, we were thinking a processing time based trigger that fires on some regular interval. For context, here’s an example of the pipeline: ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write to Avro pipeline .apply(SqsIO.read().withQueueUrl(options.getQueueUrl())) .apply( Window.<Message>configure() .discardingFiredPanes() .triggering( Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur)))) .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options))) .setCoder(AvroCoder.of(recordClass)) .apply( AvroIO.write(recordClass) .withWindowedWrites() .withTempDirectory(options.getTempDir()) .withNumShards(options.getShards()) .to(new WindowedFilenamePolicy(options.getOutputPrefix(), "avro"))); This all seemed fairly straightforward. I have not yet observed lost data with this pipeline, but I am seeing an issue with timeliness. Things seem to get hung up on finalizing file output, but I have yet to truly pinpoint the issue. To really highlight the issue, I can setup a test where I send a single message to the source queue. If nothing else happens, the data never makes it to its final output using the FlinkRunner (beam-2.15.0, flink-1.8). Has anyone seen this behavior before? Is the expectation of eventual consistency wrong? Thanks, -Preston CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024<tel:(816)%20221-1024>.
