[
https://issues.apache.org/jira/browse/FLINK-20663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317706#comment-17317706
]
Xintong Song commented on FLINK-20663:
--------------------------------------
I think I've found out the problem. I'm afraid this is a pyflink bug, which
causes the python worker not terminated and the memory not released.
Python UDFs are executed in python VM processes. A python VM is only started
when there's python UDFs to execute, and is expected terminated when the UDFs
finish, releasing the managed memory for other executions. If the VM is not
terminated, new coming tasks will not be able to use the memory. That's the
error we met.
When there're multiple python UDFs within the same slot, they share the same
VM. Consider the python VM as a resource, when it is first requested the
resource is created (starting the VM), and a second request will get the same
resource without creating a new one. The resource is disposed (terminating the
VM) only when it is released as many times as it is requested.
The bug is that, this reference counting logic is implemented twice, in
{{SharedResource.release()}} and {{BeamPythonFunctionRunner.close}}.
Consequently, when there's multiple python UDFs in the slot,
{{BeamPythonFunctionRunner}} requests {{SharedResource}} many times but only
release once, while {{SharedResource}} expects to be released as many times as
requested.
This bug can be triggered when 1) there are multiple python UDFs in the same
slot, and 2) another operator tries to allocate non-python managed memory
afterwards. If there's only 1) without 2), there are still python process
leaks, which might easily get overlooked.
[~zhou_yb], the above bug matches observations from your job.
- There are 2 python UDFs in the same slot. That can be seen from the operator
name {{(BatchExecPythonCalc, BatchExecPythonCalc)}}, and that the log "Obtained
shared Python process ..." are printed as many times as twice of the slots.
- The error happens on the same TM after the python UDFs are executed.
- Changing of the SQL codes probably have changed the number of underlying
python UDFs, thus no longer triggers the problem.
[~dianfu], [~hxbks2ks], could you confirm this issue? And before the bug is
fixed, is there any way to help the user workaround the problem? E.g., any way
to prevent multiple python UDFs in the same slot?
> Managed memory may not be released in time when operators use managed memory
> frequently
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-20663
> URL: https://issues.apache.org/jira/browse/FLINK-20663
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.12.0
> Reporter: Caizhi Weng
> Assignee: Xintong Song
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.12.2, 1.13.0
>
> Attachments: exception, summary.py, taskmanager.log
>
>
> Some batch operators (like sort merge join or hash aggregate) use managed
> memory frequently. When these operators are chained together and the cluster
> load is a bit heavy, it is very likely that the following exception occurs:
> {code:java}
> 2020-12-18 10:04:32
> java.lang.RuntimeException:
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate
> 512 pages
> at
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:85)
> at
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:297)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:103)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:90)
> at LocalHashAggregateWithKeys$209161.open(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:506)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:834)
> Suppressed: java.lang.NullPointerException
> at LocalHashAggregateWithKeys$209161.close(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:739)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:719)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:642)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:551)
> ... 3 more
> Suppressed: java.lang.NullPointerException
> at LocalHashAggregateWithKeys$209766.close(Unknown
> Source)
> ... 8 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could
> not allocate 512 pages
> at
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:231)
> at
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:83)
> ... 13 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could
> not allocate 16777216 bytes, only 9961487 bytes are remaining. This usually
> indicates that you are requesting more memory than you have reserved.
> However, when running an old JVM version it can also be caused by slow
> garbage collection. Try to upgrade to Java 8u72 or higher if running on an
> old Java version.
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:164)
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:80)
> at
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229)
> ... 14 more
> {code}
> It seems that this is caused by relying on GC to release managed memory, as
> {{System.gc()}} may not trigger GC in time. See {{UnsafeMemoryBudget.java}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)