Is this only in the Flink runner?
On Wed, Oct 23, 2019 at 2:12 PM Koprivica,Preston Blake <
[email protected]> wrote:
> I’ve tried different windowing functions and all result in the same
> behavior. The one in the previous email used the global window and a
> processing time based repeated trigger. The filename policy used real
> system time to timestamp the outgoing files.
>
>
>
> Here are a couple other window+trigger combos I’ve tried (with window
> based filename strategies):
>
>
>
> Window.<Message>into(FixedWindows.of(windowDur))
>
> .withAllowedLateness(Duration.standardHours(24))
>
> .discardingFiredPanes()
>
> .triggering(
>
> Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
>
> ie a fixed window with processing time based trigger + ~infinite lateness
>
>
>
> pipeline
>
> .apply(
>
> format("ReadSQS(%s)", options.getQueueUrl()),
>
> SqsIO.read().withQueueUrl(options.getQueueUrl()))
>
> .apply(WithTimestamps.of((Message m) -> Instant.now()))
>
> .apply(
>
> format("Window(%s)", options.getWindowDuration()),
>
> Window.into(FixedWindows.of(windowDur)))
>
> ie map event time to processing time and use default trigger (ie close of
> window, no lateness)
>
>
>
> These all resulted in the same behavior – data gets hung up in a temp file
> somewhere and the finalize file logic never seems to run.
>
>
>
> Thanks,
>
> -Preston
>
>
>
> *From: *Reuven Lax <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Wednesday, October 23, 2019 at 2:35 PM
> *To: *dev <[email protected]>
> *Subject: *Re: FileIO and windowed writes
>
>
>
> What WindowFn are you using?
>
>
>
> On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake <
> [email protected]> wrote:
>
> Hi guys,
>
>
>
> I’m currently working on a simple system where the intention is to ingest
> data from a realtime stream – in this case amazon SQS – and write the
> output in an incremental fashion to a durable filesystem (ie S3). It’s
> easy to think of this as a low-fi journaling system. We need to make sure
> that data that’s written to the source queue eventually makes it to S3. We
> are utilizing the FileIO windowed writes with a custom naming policy to
> partition the files by their event time. Because SQS can’t guarantee
> order, we do have to allow late messages. Moreover, we need a further
> guarantee that a message be written in a timely manner – we’re thinking
> some constant multiple of the windowing duration. As a first pass, we were
> thinking a processing time based trigger that fires on some regular
> interval. For context, here’s an example of the pipeline:
>
>
>
> ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message ->
> Write to Avro
>
>
>
> pipeline
>
> .apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
>
> .apply(
>
> Window.<Message>configure()
>
> .discardingFiredPanes()
>
> .triggering(
>
> Repeatedly.forever(
>
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
>
> .apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
>
> .setCoder(AvroCoder.of(recordClass))
>
> .apply(
>
> AvroIO.write(recordClass)
>
> .withWindowedWrites()
>
> .withTempDirectory(options.getTempDir())
>
> .withNumShards(options.getShards())
>
> .to(new WindowedFilenamePolicy(options.getOutputPrefix(),
> "avro")));
>
>
>
> This all seemed fairly straightforward. I have not yet observed lost data
> with this pipeline, but I am seeing an issue with timeliness. Things seem
> to get hung up on finalizing file output, but I have yet to truly pinpoint
> the issue. To really highlight the issue, I can setup a test where I send
> a single message to the source queue. If nothing else happens, the data
> never makes it to its final output using the FlinkRunner (beam-2.15.0,
> flink-1.8). Has anyone seen this behavior before? Is the expectation of
> eventual consistency wrong?
>
>
>
> Thanks,
>
> -Preston
>
>
>
>
>
>
>
> CONFIDENTIALITY NOTICE This message and any included attachments are from
> Cerner Corporation and are intended only for the addressee. The information
> contained in this message is confidential and may constitute inside or
> non-public information under international, federal, or state securities
> laws. Unauthorized forwarding, printing, copying, distribution, or use of
> such information is strictly prohibited and may be unlawful. If you are not
> the addressee, please promptly delete this message and notify the sender of
> the delivery error by e-mail or you may call Cerner's corporate offices in
> Kansas City, Missouri, U.S.A at (+1) (816)221-1024 <(816)%20221-1024>.
>
>