[
https://issues.apache.org/jira/browse/FLINK-20663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17276823#comment-17276823
]
Xintong Song edited comment on FLINK-20663 at 2/2/21, 3:15 AM:
---------------------------------------------------------------
[~ykt836],
I don't think that works.
{code:java}
ByteBuffer bb1 = segment.wrap(offset, size);
ByteBuffer bb2 = bb1.duplicate();
{code}
In the above example, {{bb2}} will not be added to the recording list, thus can
still access the memory. With our own `WrappedByteBuffer` implementation, we
can make sure calling `duplicate()` on it will also return a
`WrappedByteBuffer`.
Also, I don't think `DirectByteBuffer` is thread safe, thus changing its
`limit` or `address` will not be visible to other threads immediately.
was (Author: xintongsong):
[~ykt836],
I don't think that works.
{code:java}
ByteBuffer bb1 = segment.wrap(offset, size);
bb2 = bb1.duplicate();
{code}
I the above example, {{bb2}} will not be added to the recording list, thus can
still access the memory. With our own `WrappedByteBuffer` implementation, we
can make sure calling `duplicate` on it will also return a `WrappedByteBuffer`.
Also, I don't think `DirectByteBuffer` is thread safe, thus changing its
`limit` or `address` will not be visible to other threads immediately.
> Managed memory may not be released in time when operators use managed memory
> frequently
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-20663
> URL: https://issues.apache.org/jira/browse/FLINK-20663
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.12.0
> Reporter: Caizhi Weng
> Priority: Critical
> Fix For: 1.12.2
>
>
> Some batch operators (like sort merge join or hash aggregate) use managed
> memory frequently. When these operators are chained together and the cluster
> load is a bit heavy, it is very likely that the following exception occurs:
> {code:java}
> 2020-12-18 10:04:32
> java.lang.RuntimeException:
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate
> 512 pages
> at
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:85)
> at
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:297)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:103)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:90)
> at LocalHashAggregateWithKeys$209161.open(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:506)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:834)
> Suppressed: java.lang.NullPointerException
> at LocalHashAggregateWithKeys$209161.close(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:739)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:719)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:642)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:551)
> ... 3 more
> Suppressed: java.lang.NullPointerException
> at LocalHashAggregateWithKeys$209766.close(Unknown
> Source)
> ... 8 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could
> not allocate 512 pages
> at
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:231)
> at
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:83)
> ... 13 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could
> not allocate 16777216 bytes, only 9961487 bytes are remaining. This usually
> indicates that you are requesting more memory than you have reserved.
> However, when running an old JVM version it can also be caused by slow
> garbage collection. Try to upgrade to Java 8u72 or higher if running on an
> old Java version.
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:164)
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:80)
> at
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229)
> ... 14 more
> {code}
> It seems that this is caused by relying on GC to release managed memory, as
> {{System.gc()}} may not trigger GC in time. See {{UnsafeMemoryBudget.java}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)