[
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15414944#comment-15414944
]
Stephan Ewen commented on FLINK-4341:
-------------------------------------
Scott, we are trying to set up a test to see if some changes in the Kinesis
connector between the revisions you mentioned may be responsible.
To confirm: We will test revision {{18995c8c7b8f9c75035b1a95a00379944c6f2b0c}}
(fast) against revision {{45f7825111232f0dd225068a72d8a092f67d49d0}} (slow)
[~tzulitai] Do you have a suspicion or hunch what we should look out for?
Concerning the latest commits from release-1.1 branch: The release-1.1 branch
has the 1.1.x versions in progress. After the 1.1.0 release candidate was
created, new fixes were added that will go into 1.1.x versions. The 1.1.1
release fixes only the maven deployment of the 1.1.0 release (no code changes)
and the 1.1.2 release will contain all the commits in the release-1.1 branch at
that point.
> Checkpoint state size grows unbounded when task parallelism not uniform
> -----------------------------------------------------------------------
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.1.0
> Reporter: Scott Kidder
> Assignee: Robert Metzger
> Priority: Critical
>
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as
> expected. This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots
> each, providing a total of 4 slots. When running the application with a
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard)
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint
> states were growing unbounded when running with a parallelism of 4,
> checkpoint interval of 10 seconds:
> {code}
> ID State Size
> 1 11.3 MB
> 2 20.9 MB
> 3 30.6 MB
> 4 41.4 MB
> 5 52.6 MB
> 6 62.5 MB
> 7 71.5 MB
> 8 83.3 MB
> 9 93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of
> receiving checkpoint barrier at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum
> permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider
> using a different state backend, like the File System State backend.
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
> at
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
> ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring
> checkpointed state to task Fold: property_id, player -> 10-minute
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter -
> Transient association error (association remains live)
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to
> Actor[akka.tcp://[email protected]:6123/user/jobmanager#510517238]: max
> allowed size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was
> 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I
> suspect there was a regression introduced relating to assumptions about the
> number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a
> value ranging from 1-4). This is currently preventing me from using all
> available Task Manager slots.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)