[ 
https://issues.apache.org/jira/browse/FLINK-20663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317706#comment-17317706
 ] 

Xintong Song commented on FLINK-20663:
--------------------------------------

I think I've found out the problem. I'm afraid this is a pyflink bug, which 
causes the python worker not terminated and the memory not released.

Python UDFs are executed in python VM processes. A python VM is only started 
when there's python UDFs to execute, and is expected terminated when the UDFs 
finish, releasing the managed memory for other executions. If the VM is not 
terminated, new coming tasks will not be able to use the memory. That's the 
error we met.

When there're multiple python UDFs within the same slot, they share the same 
VM. Consider the python VM as a resource, when it is first requested the 
resource is created (starting the VM), and a second request will get the same 
resource without creating a new one. The resource is disposed (terminating the 
VM) only when it is released as many times as it is requested.

The bug is that, this reference counting logic is implemented twice, in 
{{SharedResource.release()}} and {{BeamPythonFunctionRunner.close}}. 
Consequently, when there's multiple python UDFs in the slot, 
{{BeamPythonFunctionRunner}} requests {{SharedResource}} many times but only 
release once, while {{SharedResource}} expects to be released as many times as 
requested.

This bug can be triggered when 1) there are multiple python UDFs in the same 
slot, and 2) another operator tries to allocate non-python managed memory 
afterwards. If there's only 1) without 2), there are still python process 
leaks, which might easily get overlooked.

[~zhou_yb], the above bug matches observations from your job.
- There are 2 python UDFs in the same slot. That can be seen from the operator 
name {{(BatchExecPythonCalc, BatchExecPythonCalc)}}, and that the log "Obtained 
shared Python process ..." are printed as many times as twice of the slots.
- The error happens on the same TM after the python UDFs are executed.
- Changing of the SQL codes probably have changed the number of underlying 
python UDFs, thus no longer triggers the problem.

[~dianfu], [~hxbks2ks], could you confirm this issue? And before the bug is 
fixed, is there any way to help the user workaround the problem? E.g., any way 
to prevent multiple python UDFs in the same slot?

> Managed memory may not be released in time when operators use managed memory 
> frequently
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-20663
>                 URL: https://issues.apache.org/jira/browse/FLINK-20663
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.12.0
>            Reporter: Caizhi Weng
>            Assignee: Xintong Song
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.12.2, 1.13.0
>
>         Attachments: exception, summary.py, taskmanager.log
>
>
> Some batch operators (like sort merge join or hash aggregate) use managed 
> memory frequently. When these operators are chained together and the cluster 
> load is a bit heavy, it is very likely that the following exception occurs:
> {code:java}
> 2020-12-18 10:04:32
> java.lang.RuntimeException: 
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
> 512 pages
>       at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:85)
>       at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:297)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:103)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:90)
>       at LocalHashAggregateWithKeys$209161.open(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:506)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>       at java.lang.Thread.run(Thread.java:834)
>       Suppressed: java.lang.NullPointerException
>               at LocalHashAggregateWithKeys$209161.close(Unknown Source)
>               at 
> org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:46)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:739)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:719)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:642)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:551)
>               ... 3 more
>               Suppressed: java.lang.NullPointerException
>                       at LocalHashAggregateWithKeys$209766.close(Unknown 
> Source)
>                       ... 8 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not allocate 512 pages
>       at 
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:231)
>       at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:83)
>       ... 13 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 16777216 bytes, only 9961487 bytes are remaining. This usually 
> indicates that you are requesting more memory than you have reserved. 
> However, when running an old JVM version it can also be caused by slow 
> garbage collection. Try to upgrade to Java 8u72 or higher if running on an 
> old Java version.
>       at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:164)
>       at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:80)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229)
>       ... 14 more
> {code}
> It seems that this is caused by relying on GC to release managed memory, as 
> {{System.gc()}} may not trigger GC in time. See {{UnsafeMemoryBudget.java}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to