[ 
https://issues.apache.org/jira/browse/BEAM-6465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pawel Bartoszek updated BEAM-6465:
----------------------------------
    Description: 
This ticket captures my findings when restoring a BEAM job from a savepoint on 
a Flink runner.

 

*The problem*

When job is restored from a savepoint taken a few hours ago than we see that 
checkpoint size starts growing rediciously high which leads to the job running 
out of heap space error. We use filesystem state backend, which keeps state on 
the heap.  

 

*Job structure*

Job has two paths the data lake path and the aggregate data path. 

*Data lake path*

Data lake path is a dumb sink of all records received by the job. The records 
are flushed to S3.

Datalake trigger:
{code:java}
input.apply(   
WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
.apply(Window.<RDotRecord>into(FixedWindows.of(standardMinutes(1)))
                .triggering(
                        AfterWatermark.pastEndOfWindow()
                                
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
                )
                .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
                .discardingFiredPanes()); <-- IMPORTANT {code}
 

*Aggregate path*

Aggregate path has some group by key, count etc transformations 

Aggregate trigger:
{code:java}
input.apply( 
WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
        .apply(Window.<RDotRecord>into(FixedWindows.of(WINDOW_SIZE))
                .triggering(
                        AfterWatermark.pastEndOfWindow()
                                
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
                )
                .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
                .accumulatingFiredPanes());{code}
 

*My investigation*

Our team has written a tool to collect input watermarks from the Flink API. It 
turned out that it's a common situation that for pretty much every operator 
some sub operators (running on particular Flink slot) are running slower thus 
watermark is falling behind other slots. Look at the graph below: 

!Screen Shot 2019-01-18 at 12.07.03 copy.png!

Y axis represents an event time, X wall clock time. Graph shows input 
watermarks for some operator in the job. Each line represents an input 
watermark for a specific slot (job runs with parallelism 8) for which operator 
is running on.

At 17:55 the difference between slowest slot and quickest one is already 20 
mins. This means that 20 mins of data will be buffered in memory until slowest 
slot's watermark will cross the end of the 1 minute window of the buffered 
data. 

Unfortunately it's very hard to tell why some slots are doing better then the 
other( I believe data is properly balanced when it comes to hashing etc)  

 

*My findings*

If the state is being accumulated because of watermark slowing down on some 
operator slots (Flink specific) than introducing *early firings* *with 
discarding mode* should help ... and indeed helped.

 

*My worry is that introducing an early firings as a way to avoid OOM error 
during catchup seems hacky to me. The other downside is that early firings 
introduce speculative results which might not be acceptable in some cases.*

 

*Setup:*
 * Job reads records from 32 Kinesis shards.
 * Job parallelism 8
 * Running on Beam 2.7 Flink 1.5
 *Hardware:*
 ** Master:1 x m5.xlarge
 ** Core instances: 5 x r4.2xlarge
 * *YARN session configuration:*
 ** 
{code:java}
/usr/bin/flink run --class streaming.Main -m yarn-cluster --yarnstreaming 
--yarnjobManagerMemory 6272 --yarntaskManagerMemory 26000 -yD 
classloader.resolve-order=parent-first -yD parallelism.default=8 -yD 
containerized.heap-cutoff-ratio=0.15 -yD state.backend=filesystem -yD 
yarn.maximum-failed-containers=-1 -yD jobmanager.web.checkpoints.history=1000 
-yD akka.ask.timeout=60s -XX:GCLogFileSize=20M -XX:NumberOfGCLogFiles=2 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution 
-XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+UseG1GC /home/hadoop/job.jar 
--runner=FlinkRunner --awsRegion=eu-west-1 --appName=XXX --input=kinesis://XXX 
--outputFileSystemType=S3 --outputFileSystemRoot=XXX 
--outputDirectory=structured-streaming --externalizedCheckpointsEnabled=true 
--checkpointingInterval=300000 --checkpointTimeoutMillis=360000 
--failOnCheckpointingErrors=false --minPauseBetweenCheckpoints=60000 
--parallelism=8{code}

 
  

 

  was:
This ticket captures my findings when restoring a BEAM job from a savepoint on 
Flink runner.

 

*The problem*

When job is restored from a savepoint taken a few hours ago than we see that 
checkpoint size starts growing rediciously high which leads to the job running 
out of heap space error (we use filesystem state backend).  

 

*Job structure*

Job has two paths the data lake path and the aggregate data path. 

*Data lake path*

Data lake path is a dumb sink of all records received by the job. The records 
are flushed to S3.

Datalake trigger:
{code:java}
input.apply(   
WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
.apply(Window.<RDotRecord>into(FixedWindows.of(standardMinutes(1)))
                .triggering(
                        AfterWatermark.pastEndOfWindow()
                                
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
                )
                .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
                .discardingFiredPanes());{code}
 

*Aggregate path*

Aggregate path has some group by key, count etc transformations 

 

Aggregate trigger:
{code:java}
input.apply( 
WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
        .apply(Window.<RDotRecord>into(FixedWindows.of(WINDOW_SIZE))
                .triggering(
                        AfterWatermark.pastEndOfWindow()
                                
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
                )
                .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
                .accumulatingFiredPanes());{code}
 

*My investigation*

Our team has written a tool to collect input watermarks from the Flink API. It 
turned out that some sub operator(running on particular Flink slot) are running 
slower thus watermark is falling behind other slots. Look at the graph below: 
!Screen Shot 2019-01-18 at 12.07.03.png!

Y axis represents an event time, X wall clock time. Graph shows input 
watermarks for some operator in the job. Each line represents an input 
watermark for a specific slot (job runs with parallelism 8) operator is running.

At 17:55 the difference between slowest slot and quickest one is already 10 
mins. This means that 10 mins of data will be buffered in memory until slowest 
slot's watermark will cross the end of the 1 minute window of the buffered 
data.  

 

 

 

 

*My findings*

If the state is being accumulated because of watermark slowing down on some 
operator slots (Flink specific) than introducing *early firings* should help 
... and indeed helped. I can see that low watermark on JDBC task (where I write 
to a database) 

 

 

 

 

Setup:
 * Job reads records from 32 Kinesis shards.
 * Job parallelism 8
 * Running on Beam 2.7 Flink 1.5
 *Hardware:*
 ** Master:1 x m5.xlarge
 ** Core instances: 5 x r4.2xlarge
 * *YARN session configuration:*
 ** 
{code:java}
/usr/bin/flink run --class streaming.Main -m yarn-cluster --yarnstreaming 
--yarnjobManagerMemory 6272 --yarntaskManagerMemory 26000 -yD 
classloader.resolve-order=parent-first -yD parallelism.default=8 -yD 
containerized.heap-cutoff-ratio=0.15 -yD state.backend=filesystem -yD 
yarn.maximum-failed-containers=-1 -yD jobmanager.web.checkpoints.history=1000 
-yD akka.ask.timeout=60s -XX:GCLogFileSize=20M -XX:NumberOfGCLogFiles=2 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution 
-XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+UseG1GC /home/hadoop/job.jar 
--runner=FlinkRunner --awsRegion=eu-west-1 --appName=XXX --input=kinesis://XXX 
--outputFileSystemType=S3 --outputFileSystemRoot=XXX 
--outputDirectory=structured-streaming --externalizedCheckpointsEnabled=true 
--checkpointingInterval=300000 --checkpointTimeoutMillis=360000 
--failOnCheckpointingErrors=false --minPauseBetweenCheckpoints=60000 
--parallelism=8{code}

 
  

 


> Flink: State accumulation during restoring from a savepoint
> -----------------------------------------------------------
>
>                 Key: BEAM-6465
>                 URL: https://issues.apache.org/jira/browse/BEAM-6465
>             Project: Beam
>          Issue Type: Test
>          Components: beam-model
>    Affects Versions: 2.7.0
>            Reporter: Pawel Bartoszek
>            Assignee: Kenneth Knowles
>            Priority: Major
>         Attachments: Screen Shot 2019-01-18 at 12.07.03 copy.png, Screen Shot 
> 2019-01-18 at 12.07.03.png
>
>
> This ticket captures my findings when restoring a BEAM job from a savepoint 
> on a Flink runner.
>  
> *The problem*
> When job is restored from a savepoint taken a few hours ago than we see that 
> checkpoint size starts growing rediciously high which leads to the job 
> running out of heap space error. We use filesystem state backend, which keeps 
> state on the heap.  
>  
> *Job structure*
> Job has two paths the data lake path and the aggregate data path. 
> *Data lake path*
> Data lake path is a dumb sink of all records received by the job. The records 
> are flushed to S3.
> Datalake trigger:
> {code:java}
> input.apply(   
> WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
> .apply(Window.<RDotRecord>into(FixedWindows.of(standardMinutes(1)))
>                 .triggering(
>                         AfterWatermark.pastEndOfWindow()
>                                 
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
>                 )
>                 .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
>                 .discardingFiredPanes()); <-- IMPORTANT {code}
>  
> *Aggregate path*
> Aggregate path has some group by key, count etc transformations 
> Aggregate trigger:
> {code:java}
> input.apply( 
> WithTimestamps.of(extractTimestamp).withAllowedTimestampSkew(standardDays(7)))
>         .apply(Window.<RDotRecord>into(FixedWindows.of(WINDOW_SIZE))
>                 .triggering(
>                         AfterWatermark.pastEndOfWindow()
>                                 
> .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringsPeriod))
>                 )
>                 .withAllowedLateness(standardMinutes(30), FIRE_ALWAYS)
>                 .accumulatingFiredPanes());{code}
>  
> *My investigation*
> Our team has written a tool to collect input watermarks from the Flink API. 
> It turned out that it's a common situation that for pretty much every 
> operator some sub operators (running on particular Flink slot) are running 
> slower thus watermark is falling behind other slots. Look at the graph below: 
> !Screen Shot 2019-01-18 at 12.07.03 copy.png!
> Y axis represents an event time, X wall clock time. Graph shows input 
> watermarks for some operator in the job. Each line represents an input 
> watermark for a specific slot (job runs with parallelism 8) for which 
> operator is running on.
> At 17:55 the difference between slowest slot and quickest one is already 20 
> mins. This means that 20 mins of data will be buffered in memory until 
> slowest slot's watermark will cross the end of the 1 minute window of the 
> buffered data. 
> Unfortunately it's very hard to tell why some slots are doing better then the 
> other( I believe data is properly balanced when it comes to hashing etc)  
>  
> *My findings*
> If the state is being accumulated because of watermark slowing down on some 
> operator slots (Flink specific) than introducing *early firings* *with 
> discarding mode* should help ... and indeed helped.
>  
> *My worry is that introducing an early firings as a way to avoid OOM error 
> during catchup seems hacky to me. The other downside is that early firings 
> introduce speculative results which might not be acceptable in some cases.*
>  
> *Setup:*
>  * Job reads records from 32 Kinesis shards.
>  * Job parallelism 8
>  * Running on Beam 2.7 Flink 1.5
>  *Hardware:*
>  ** Master:1 x m5.xlarge
>  ** Core instances: 5 x r4.2xlarge
>  * *YARN session configuration:*
>  ** 
> {code:java}
> /usr/bin/flink run --class streaming.Main -m yarn-cluster --yarnstreaming 
> --yarnjobManagerMemory 6272 --yarntaskManagerMemory 26000 -yD 
> classloader.resolve-order=parent-first -yD parallelism.default=8 -yD 
> containerized.heap-cutoff-ratio=0.15 -yD state.backend=filesystem -yD 
> yarn.maximum-failed-containers=-1 -yD jobmanager.web.checkpoints.history=1000 
> -yD akka.ask.timeout=60s -XX:GCLogFileSize=20M -XX:NumberOfGCLogFiles=2 
> -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution 
> -XX:+PrintGCCause -XX:+PrintGCDateStamps -XX:+UseG1GC /home/hadoop/job.jar 
> --runner=FlinkRunner --awsRegion=eu-west-1 --appName=XXX 
> --input=kinesis://XXX --outputFileSystemType=S3 --outputFileSystemRoot=XXX 
> --outputDirectory=structured-streaming --externalizedCheckpointsEnabled=true 
> --checkpointingInterval=300000 --checkpointTimeoutMillis=360000 
> --failOnCheckpointingErrors=false --minPauseBetweenCheckpoints=60000 
> --parallelism=8{code}
>  
>   
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to