[
https://issues.apache.org/jira/browse/BEAM-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pawel Bartoszek updated BEAM-6465:
----------------------------------
Attachment: Screen Shot 2019-01-18 at 12.07.03 copy.png
> Flink: State accumulation during restoring from a savepoint
> -----------------------------------------------------------
>
> Key: BEAM-6465
> URL: https://issues.apache.org/jira/browse/BEAM-6465
> Project: Beam
> Issue Type: Test
> Components: beam-model
> Affects Versions: 2.7.0
> Reporter: Pawel Bartoszek
> Assignee: Kenneth Knowles
> Priority: Major
> Attachments: Screen Shot 2019-01-18 at 12.07.03 copy.png, Screen Shot
> 2019-01-18 at 12.07.03.png
>
>
> This ticket captures my findings when restoring a BEAM job from a savepoint
> on Flink runner.
>
> *The problem*
> When job is restored from a savepoint taken a few hours ago than we see that
> checkpoint size starts growing rediciously high which leads to the job
> running out of heap space error (we use filesystem state backend).
>
> *Job structure*
> Job has two paths the data lake path and the aggregate data path.
> *Data lake path*
> Data lake path is a dumb sink of all records received by the job. The records
> are flushed to S3.
> Datalake trigger:
> {code:java}
> input.apply(
> WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
> .apply(Window.<RDotRecord>into(FixedWindows.of(standardMinutes(1)))
> .triggering(
> AfterWatermark.pastEndOfWindow()
>
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
> )
> .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
> .discardingFiredPanes());{code}
>
> *Aggregate path*
> Aggregate path has some group by key, count etc transformations
>
> Aggregate trigger:
> {code:java}
> input.apply(
> WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
> .apply(Window.<RDotRecord>into(FixedWindows.of(WINDOW_SIZE))
> .triggering(
> AfterWatermark.pastEndOfWindow()
>
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
> )
> .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
> .accumulatingFiredPanes());{code}
>
> *My investigation*
> Our team has written a tool to collect input watermarks from the Flink API.
> It turned out that some sub operator(running on particular Flink slot) are
> running slower thus watermark is falling behind other slots. Look at the
> graph below: !Screen Shot 2019-01-18 at 12.07.03.png!
> Y axis represents an event time, X wall clock time. Graph shows input
> watermarks for some operator in the job. Each line represents an input
> watermark for a specific slot (job runs with parallelism 8) operator is
> running.
> At 17:55 the difference between slowest slot and quickest one is already 10
> mins. This means that 10 mins of data will be buffered in memory until
> slowest slot's watermark will cross the end of the 1 minute window of the
> buffered data.
>
>
>
>
> *My findings*
> If the state is being accumulated because of watermark slowing down on some
> operator slots (Flink specific) than introducing *early firings* should help
> ... and indeed helped. I can see that low watermark on JDBC task (where I
> write to a database)
>
>
>
>
> Setup:
> * Job reads records from 32 Kinesis shards.
> * Job parallelism 8
> * Running on Beam 2.7 Flink 1.5
> *Hardware:*
> ** Master:1 x m5.xlarge
> ** Core instances: 5 x r4.2xlarge
> * *YARN session configuration:*
> **
> {code:java}
> /usr/bin/flink run --class streaming.Main -m yarn-cluster --yarnstreaming
> --yarnjobManagerMemory 6272 --yarntaskManagerMemory 26000 -yD
> classloader.resolve-order=parent-first -yD parallelism.default=8 -yD
> containerized.heap-cutoff-ratio=0.15 -yD state.backend=filesystem -yD
> yarn.maximum-failed-containers=-1 -yD jobmanager.web.checkpoints.history=1000
> -yD akka.ask.timeout=60s -XX:GCLogFileSize=20M -XX:NumberOfGCLogFiles=2
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution
> -XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+UseG1GC /home/hadoop/job.jar
> --runner=FlinkRunner --awsRegion=eu-west-1 --appName=XXX
> --input=kinesis://XXX --outputFileSystemType=S3 --outputFileSystemRoot=XXX
> --outputDirectory=structured-streaming --externalizedCheckpointsEnabled=true
> --checkpointingInterval=300000 --checkpointTimeoutMillis=360000
> --failOnCheckpointingErrors=false --minPauseBetweenCheckpoints=60000
> --parallelism=8{code}
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)