[ 
https://issues.apache.org/jira/browse/FLINK-18668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-18668.
--------------------------------
      Assignee: Caizhi Weng
    Resolution: Fixed

master: 2658510ff08056d0123835976d3f8f13f226e3ed

> BytesHashMap#growAndRehash should release newly allocated segments before 
> throwing the exception
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-18668
>                 URL: https://issues.apache.org/jira/browse/FLINK-18668
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.0
>            Reporter: Caizhi Weng
>            Assignee: Caizhi Weng
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.12.0
>
>
> In {{BytesHashMap#growAndRehash}} we have the following code.
> {code:java}
> List<MemorySegment> newBucketSegments = new ArrayList<>(required);
> try {
>       int numAllocatedSegments = required - memoryPool.freePages();
>       if (numAllocatedSegments > 0) {
>               throw new MemoryAllocationException();
>       }
>       int needNumFromFreeSegments = required - newBucketSegments.size();
>       for (int end = needNumFromFreeSegments; end > 0; end--) {
>               newBucketSegments.add(memoryPool.nextSegment());
>       }
>       setBucketVariables(newBucketSegments);
> } catch (MemoryAllocationException e) {
>       LOG.warn("BytesHashMap can't allocate {} pages, and now used {} pages",
>                       required, reservedNumBuffers);
>       throw new EOFException();
> }
> {code}
> Newly allocated memory segments are temporarily stored in 
> {{newBucketSegments}} before giving to the hash table. But if a 
> {{MemoryAllocationException}} happens, these segments are not returned to the 
> memory pool, causing the following exception stack trace.
> {code}
> java.lang.RuntimeException: 
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
> 512 pages
>         at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:84)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.growAndRehash(BytesHashMap.java:393)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.append(BytesHashMap.java:313)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at HashAggregateWithKeys$360.processElement(Unknown Source) ~[?:?]
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:560)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
>         Suppressed: java.lang.RuntimeException: Should return all used memory 
> before clean, page used: 2814
>                 at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.close(LazyMemorySegmentPool.java:99)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.free(BytesHashMap.java:486)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.free(BytesHashMap.java:475)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at HashAggregateWithKeys$360.close(Unknown Source) ~[?:?]
>                 at 
> org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:44)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:707)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:687)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:626)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:542)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) 
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) 
> [flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>                 at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not allocate 512 pages
>         at 
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         ... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 16777216 bytes, only 0 bytes are remaining
>         at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:159)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:85)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:227)
>  ~[flink-dist_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
>  ~[flink-table-blink_2.11-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
>         ... 15 more
> {code}
> We should first return these segments to the memory pool before throwing the 
> exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to