Scott Kidder created FLINK-4341:
-----------------------------------
Summary: Checkpoint state size grows unbounded when task
parallelism not uniform
Key: FLINK-4341
URL: https://issues.apache.org/jira/browse/FLINK-4341
Project: Flink
Issue Type: Bug
Components: Core
Affects Versions: 1.1.0
Reporter: Scott Kidder
This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I
was previously using a 1.1.0 snapshot (commit 18995c8) which performed as
expected. This issue was introduced somewhere between those commits.
I've got a Flink application that uses the Kinesis Stream Consumer to read from
a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots each,
providing a total of 4 slots. When running the application with a parallelism
of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) and 4 slots for
subsequent tasks that process the Kinesis stream data. I use an in-memory store
for checkpoint data.
Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint
states were growing unbounded when running with a parallelism of 4, checkpoint
interval of 10 seconds:
{code}
ID State Size
1 11.3 MB
2 20.9 MB
3 30.6 MB
4 41.4 MB
5 52.6 MB
6 62.5 MB
7 71.5 MB
8 83.3 MB
9 93.5 MB
{code}
The first 4 checkpoints generally succeed, but then fail with an exception like
the following:
{code}
java.lang.RuntimeException: Error triggering a checkpoint as the result of
receiving checkpoint barrier at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at
java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Size of
the state is larger than the maximum permitted memory-backed state.
Size=12105407 , maxSize=5242880 . Consider using a different state backend,
like the File System State backend. at
org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
at
org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
at
org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
... 8 more
{code}
Or:
{code}
2016-08-09 17:44:43,626 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring
checkpointed state to task Fold: property_id, player -> 10-minute
Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter - Transient
association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to
Actor[akka.tcp://[email protected]:6123/user/jobmanager#510517238]: max allowed
size 10485760 bytes, actual size of encoded class
org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was 10891825
bytes.
{code}
This can be fixed by simply submitting the job with a parallelism of 2. I
suspect there was a regression introduced relating to assumptions about the
number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a
value ranging from 1-4). This is currently preventing me from using all
available Task Manager slots.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)