Hi!
Looks like a potential leak, caused by your code or by Beam itself.
Would you be able to supply a heap dump from one of the task managers?
That would greatly help debugging this issue.
-Max
On 07.08.20 00:19, David Gogokhiya wrote:
Hi,
We recently started using Apache Beam version 2.20.0 running on Flink
version 1.9 deployed on kubernetes to process unbounded streams of data.
However, we noticed that the memory consumed by stateful Beam is
steadily increasing over time with no drops no matter what the current
bandwidth is. We were wondering if this is expected and if not what
would be the best way to resolve it.
More Context
We have the following pipeline that consumes messages from the unbounded
stream of data. Later we deduplicate the messages based on unique
message id using the deduplicate function
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
Since we are using Beam version 2.20.0, we copied the source code of the
deduplicate function
<https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from
version 2.22.0. After that we unmap the tuple, retrieve the necessary
data from message payload and dump the corresponding data into the log.
Pipeline:
Flink configuration:
As we mentioned before, we noticed that the memory usage of the
jobmanager and taskmanager pod are steadily increasing with no drops no
matter what the current bandwidth is. We tried allocating more memory
but it seems like no matter how much memory we allocate it eventually
reaches its limit and then it tries to restart itself.
Sincerely, David