[
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15441113#comment-15441113
]
ASF GitHub Bot commented on FLINK-4341:
---------------------------------------
Github user tzulitai commented on the issue:
https://github.com/apache/flink/pull/2414
To include the missing case @rmetzger mentioned, it turns out the fix is
actually more complicated than I expected due to correct state determination
after every reshard, and requires a bit of rework on our current shard
discovery mechanism to get it right.
Heads-up notice that this will probably need a re-review. Sorry for the
delay, I'm currently still on it, hopefully will update the PR by the end of
today ;)
> Kinesis connector does not emit maximum watermark properly
> ----------------------------------------------------------
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
> Issue Type: Bug
> Components: Streaming Connectors
> Affects Versions: 1.1.0, 1.1.1
> Reporter: Scott Kidder
> Assignee: Robert Metzger
> Priority: Blocker
> Fix For: 1.2.0, 1.1.2
>
>
> **Prevously reported as "Checkpoint state size grows unbounded when task
> parallelism not uniform"**
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as
> expected. This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots
> each, providing a total of 4 slots. When running the application with a
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard)
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint
> states were growing unbounded when running with a parallelism of 4,
> checkpoint interval of 10 seconds:
> {code}
> ID State Size
> 1 11.3 MB
> 2 20.9 MB
> 3 30.6 MB
> 4 41.4 MB
> 5 52.6 MB
> 6 62.5 MB
> 7 71.5 MB
> 8 83.3 MB
> 9 93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of
> receiving checkpoint barrier at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum
> permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider
> using a different state backend, like the File System State backend.
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146)
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200)
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190)
> at
> org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762)
> ... 8 more
> {code}
> Or:
> {code}
> 2016-08-09 17:44:43,626 INFO
> org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring
> checkpointed state to task Fold: property_id, player -> 10-minute
> Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4)
> 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter -
> Transient association error (association remains live)
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to
> Actor[akka.tcp://[email protected]:6123/user/jobmanager#510517238]: max
> allowed size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was
> 10891825 bytes.
> {code}
> This can be fixed by simply submitting the job with a parallelism of 2. I
> suspect there was a regression introduced relating to assumptions about the
> number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a
> value ranging from 1-4). This is currently preventing me from using all
> available Task Manager slots.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)