[ 
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15414599#comment-15414599
 ] 

Scott Kidder commented on FLINK-4341:
-------------------------------------

Unfortunately I don't have the time to develop a test application to 
demonstrate this issue without Kinesis. It might be reproducible with Kafka 
when the number of shards (or the Kafka equivalent) is less than the 
parallelism specified on the job.

Also, I noticed that the 1.1.0 binaries available for download were created on 
August 4 and don't include the latest commits on the release-1.1 branch in Git. 
Do you know when they'll be updated? I deploy Flink as a Docker container and 
use the Flink tar-gzip binary to build the Docker image.



> Checkpoint state size grows unbounded when task parallelism not uniform
> -----------------------------------------------------------------------
>
>                 Key: FLINK-4341
>                 URL: https://issues.apache.org/jira/browse/FLINK-4341
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.0
>            Reporter: Scott Kidder
>            Priority: Critical
>
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
> expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read 
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots 
> each, providing a total of 4 slots.  When running the application with a 
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) 
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use 
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
> states were growing unbounded when running with a parallelism of 4, 
> checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 2    20.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception 
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>  at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>  at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>  at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>  at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at 
> java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Size of 
> the state is larger than the maximum permitted memory-backed state. 
> Size=12105407 , maxSize=5242880 . Consider using a different state backend, 
> like the File System State backend. at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
>  at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
>  at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
>  at 
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
>  at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
>  ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO  
> org.apache.flink.streaming.runtime.tasks.StreamTask           - Restoring 
> checkpointed state to task Fold: property_id, player -> 10-minute 
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter            - 
> Transient association error (association remains live) 
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
> Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max 
> allowed size 10485760 bytes, actual size of encoded class 
> org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 
> 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I 
> suspect there was a regression introduced relating to assumptions about the 
> number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a 
> value ranging from 1-4). This is currently preventing me from using all 
> available Task Manager slots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to