I’ve tried different windowing functions and all result in the same behavior.
The one in the previous email used the global window and a processing time
based repeated trigger. The filename policy used real system time to timestamp
the outgoing files.
Here are a couple other window+trigger combos I’ve tried (with window based
filename strategies):
Window.<Message>into(FixedWindows.of(windowDur))
.withAllowedLateness(Duration.standardHours(24))
.discardingFiredPanes()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
ie a fixed window with processing time based trigger + ~infinite lateness
pipeline
.apply(
format("ReadSQS(%s)", options.getQueueUrl()),
SqsIO.read().withQueueUrl(options.getQueueUrl()))
.apply(WithTimestamps.of((Message m) -> Instant.now()))
.apply(
format("Window(%s)", options.getWindowDuration()),
Window.into(FixedWindows.of(windowDur)))
ie map event time to processing time and use default trigger (ie close of
window, no lateness)
These all resulted in the same behavior – data gets hung up in a temp file
somewhere and the finalize file logic never seems to run.
Thanks,
-Preston
From: Reuven Lax <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, October 23, 2019 at 2:35 PM
To: dev <[email protected]>
Subject: Re: FileIO and windowed writes
What WindowFn are you using?
On Wed, Oct 23, 2019 at 11:36 AM Koprivica,Preston Blake
<[email protected]<mailto:[email protected]>> wrote:
Hi guys,
I’m currently working on a simple system where the intention is to ingest data
from a realtime stream – in this case amazon SQS – and write the output in an
incremental fashion to a durable filesystem (ie S3). It’s easy to think of
this as a low-fi journaling system. We need to make sure that data that’s
written to the source queue eventually makes it to S3. We are utilizing the
FileIO windowed writes with a custom naming policy to partition the files by
their event time. Because SQS can’t guarantee order, we do have to allow late
messages. Moreover, we need a further guarantee that a message be written in a
timely manner – we’re thinking some constant multiple of the windowing
duration. As a first pass, we were thinking a processing time based trigger
that fires on some regular interval. For context, here’s an example of the
pipeline:
ReadSQS -> Window + Processing Time Trigger -> Convert to Avro Message -> Write
to Avro
pipeline
.apply(SqsIO.read().withQueueUrl(options.getQueueUrl()))
.apply(
Window.<Message>configure()
.discardingFiredPanes()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowDur))))
.apply(ParDo.of(new SqsMsgToAvroDoFn<>(recordClass, options)))
.setCoder(AvroCoder.of(recordClass))
.apply(
AvroIO.write(recordClass)
.withWindowedWrites()
.withTempDirectory(options.getTempDir())
.withNumShards(options.getShards())
.to(new WindowedFilenamePolicy(options.getOutputPrefix(),
"avro")));
This all seemed fairly straightforward. I have not yet observed lost data with
this pipeline, but I am seeing an issue with timeliness. Things seem to get
hung up on finalizing file output, but I have yet to truly pinpoint the issue.
To really highlight the issue, I can setup a test where I send a single message
to the source queue. If nothing else happens, the data never makes it to its
final output using the FlinkRunner (beam-2.15.0, flink-1.8). Has anyone seen
this behavior before? Is the expectation of eventual consistency wrong?
Thanks,
-Preston
CONFIDENTIALITY NOTICE This message and any included attachments are from
Cerner Corporation and are intended only for the addressee. The information
contained in this message is confidential and may constitute inside or
non-public information under international, federal, or state securities laws.
Unauthorized forwarding, printing, copying, distribution, or use of such
information is strictly prohibited and may be unlawful. If you are not the
addressee, please promptly delete this message and notify the sender of the
delivery error by e-mail or you may call Cerner's corporate offices in Kansas
City, Missouri, U.S.A at (+1) (816)221-1024<tel:(816)%20221-1024>.