[
https://issues.apache.org/jira/browse/FLINK-34325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexis Sarda-Espinosa updated FLINK-34325:
------------------------------------------
Description:
I have a job that uses broadcast state to maintain a cache of required
metadata. I am currently evaluating memory requirements of my specific use
case, and I ran into a weird situation that seems worrisome.
All sources in my job are Kafka sources. I wrote a large amount of messages in
Kafka to force the broadcast state's cache to grow. At some point, this caused
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the -Job- Task
Manager. I would have expected the whole java process of the TM to crash, but
the job was simply restarted. What's worrisome is that, after 2 checkpoint
failures ^1^, the job restarted and subsequently resumed from the latest
successful checkpoint and completely ignored all the events I wrote to Kafka,
which I can verify because I have a custom metric that exposes the approximate
size of this cache, and the fact that the job didn't crashloop at this point
after reading all the messages from Kafka over and over again.
I'm attaching an excerpt of the Job Manager's logs. My main concerns are:
# It seems the memory error somehow prevented the Kafka offsets from being
"rolled back", so eventually the Kafka events that should have ended in the
broadcast state's cache were ignored.
# -Is it normal that the state is somehow "materialized" in the JM and is thus
affected by the size of the JM's heap? Is this something particular due to the
use of broadcast state? I found this very surprising.- See comments
^1^ Two failures are expected since
{{execution.checkpointing.tolerable-failed-checkpoints=1}}
was:
I have a job that uses broadcast state to maintain a cache of required
metadata. I am currently evaluating memory requirements of my specific use
case, and I ran into a weird situation that seems worrisome.
All sources in my job are Kafka sources. I wrote a large amount of messages in
Kafka to force the broadcast state's cache to grow. At some point, this caused
an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the -Job- Task
Manager. I would have expected the whole java process of the TM to crash, but
the job was simply restarted. What's worrisome is that, after 2 checkpoint
failures ^1^, the job restarted and subsequently resumed from the latest
successful checkpoint and completely ignored all the events I wrote to Kafka,
which I can verify because I have a custom metric that exposes the approximate
size of this cache, and the fact that the job didn't crashloop at this point
after reading all the messages from Kafka over and over again.
I'm attaching an excerpt of the Job Manager's logs. My main concerns are:
# It seems the memory error from the JM didn't prevent the Kafka offsets from
being "rolled back", so eventually the Kafka events that should have ended in
the broadcast state's cache were ignored.
# -Is it normal that the state is somehow "materialized" in the JM and is thus
affected by the size of the JM's heap? Is this something particular due to the
use of broadcast state? I found this very surprising.- See comments
^1^ Two failures are expected since
{{execution.checkpointing.tolerable-failed-checkpoints=1}}
> Inconsistent state with data loss after OutOfMemoryError
> --------------------------------------------------------
>
> Key: FLINK-34325
> URL: https://issues.apache.org/jira/browse/FLINK-34325
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.17.1
> Environment: Flink on Kubernetes with HA, RocksDB with incremental
> checkpoints on Azure
> Reporter: Alexis Sarda-Espinosa
> Priority: Major
> Attachments: jobmanager_log.txt
>
>
> I have a job that uses broadcast state to maintain a cache of required
> metadata. I am currently evaluating memory requirements of my specific use
> case, and I ran into a weird situation that seems worrisome.
> All sources in my job are Kafka sources. I wrote a large amount of messages
> in Kafka to force the broadcast state's cache to grow. At some point, this
> caused an "{{java.lang.OutOfMemoryError: Java heap space}}" error in the
> -Job- Task Manager. I would have expected the whole java process of the TM to
> crash, but the job was simply restarted. What's worrisome is that, after 2
> checkpoint failures ^1^, the job restarted and subsequently resumed from the
> latest successful checkpoint and completely ignored all the events I wrote to
> Kafka, which I can verify because I have a custom metric that exposes the
> approximate size of this cache, and the fact that the job didn't crashloop at
> this point after reading all the messages from Kafka over and over again.
> I'm attaching an excerpt of the Job Manager's logs. My main concerns are:
> # It seems the memory error somehow prevented the Kafka offsets from being
> "rolled back", so eventually the Kafka events that should have ended in the
> broadcast state's cache were ignored.
> # -Is it normal that the state is somehow "materialized" in the JM and is
> thus affected by the size of the JM's heap? Is this something particular due
> to the use of broadcast state? I found this very surprising.- See comments
> ^1^ Two failures are expected since
> {{execution.checkpointing.tolerable-failed-checkpoints=1}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)