[ 
https://issues.apache.org/jira/browse/FLINK-20663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316264#comment-17316264
 ] 

zhou commented on FLINK-20663:
------------------------------

 here is script (the data set amount is small ) :


 /space/airflow/dags/chloe # /space/flink/bin/flink run -m yarn-cluster -yjm 
26096 -ytm 46789 -ynm collocation --pyFiles /space/airflow/dags/ -py 
/space/airflow/dags/chloe/material_collocation_summary.py --date=2021-04-01 
--env=pro
{quote}

File "/space/airflow/dags/chloe/material_collocation_summary.py", line 181, in 
<module>
 main(date)
 File "/space/airflow/dags/chloe/material_collocation_summary.py", line 129, in 
main
 result.execute().print()
 File "/space/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line 
219, in print
 File "/space/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", 
line 1286, in __call__
 File "/space/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
147, in deco
 File "/space/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 
328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1428.print.
: java.lang.RuntimeException: Failed to fetch next result
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 at 
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
 at 
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
 at 
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
 at 
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
 at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
 at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to fetch job execution result
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 ... 16 more
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
ef1a60be8f725a192a72b12cbcc2769c)
 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
 at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
 ... 18 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: ef1a60be8f725a192a72b12cbcc2769c)
 at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
 at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
 at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
 at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
 at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
 at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
 at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 ... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
 at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
 at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
 ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
 at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
 at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
 at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
 at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
 at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
 at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
 at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
 at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
 at akka.actor.Actor.aroundReceive(Actor.scala:517)
 at akka.actor.Actor.aroundReceive$(Actor.scala:515)
 at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
 at akka.actor.ActorCell.invoke(ActorCell.scala:561)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
 at akka.dispatch.Mailbox.run(Mailbox.scala:225)
 at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
 at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
512 pages
 at 
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:84)
 at 
org.apache.flink.table.runtime.hashtable.BaseHybridHashTable.getNextBuffer(BaseHybridHashTable.java:254)
 at 
org.apache.flink.table.runtime.hashtable.BaseHybridHashTable.nextSegment(BaseHybridHashTable.java:313)
 at 
org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:166)
 at 
org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:136)
 at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.createPartitions(LongHybridHashTable.java:276)
 at 
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.<init>(LongHybridHashTable.java:89)
 at LongHashJoinOperator$893$LongHashTable$877.<init>(Unknown Source)
 at LongHashJoinOperator$893.open(Unknown Source)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not 
allocate 512 pages
 at 
org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:235)
 at 
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
 ... 16 more
Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
not allocate 16777216 bytes, only 0 bytes are remaining. This usually indicates 
that you are requesting more memory than you have reserved. However, when 
running an old JVM version it can also be caused by slow garbage collection. 
Try to upgrade to Java 8u72 or higher if running on an old Java version.
 at 
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:170)
 at 
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:84)
 at 
org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:232)
 ... 17 more

org.apache.flink.client.program.ProgramAbortException
 at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
 at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
 at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to 
access closed classloader. Please check if you store classloaders directly or 
indirectly in static fields. If the stacktrace suggests that the leak occurs in 
a third party library and cannot be fixed immediately, you can disable this 
check with the configuration 'classloader.check-leaked-classloader'.
 at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
 at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
 at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2803)
 at 
org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3059)
 at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3018)
 at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2991)
 at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2871)
 at org.apache.hadoop.conf.Configuration.get(Configuration.java:1223)
 at 
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1835)
 at 
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
 at 
org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
 at 
org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
 at 
org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
 at 
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
{quote}
 

 

> Managed memory may not be released in time when operators use managed memory 
> frequently
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-20663
>                 URL: https://issues.apache.org/jira/browse/FLINK-20663
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.12.0
>            Reporter: Caizhi Weng
>            Assignee: Xintong Song
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.12.2, 1.13.0
>
>
> Some batch operators (like sort merge join or hash aggregate) use managed 
> memory frequently. When these operators are chained together and the cluster 
> load is a bit heavy, it is very likely that the following exception occurs:
> {code:java}
> 2020-12-18 10:04:32
> java.lang.RuntimeException: 
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate 
> 512 pages
>       at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:85)
>       at 
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:297)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:103)
>       at 
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:90)
>       at LocalHashAggregateWithKeys$209161.open(Unknown Source)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:506)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>       at java.lang.Thread.run(Thread.java:834)
>       Suppressed: java.lang.NullPointerException
>               at LocalHashAggregateWithKeys$209161.close(Unknown Source)
>               at 
> org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:46)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:739)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:719)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:642)
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:551)
>               ... 3 more
>               Suppressed: java.lang.NullPointerException
>                       at LocalHashAggregateWithKeys$209766.close(Unknown 
> Source)
>                       ... 8 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not allocate 512 pages
>       at 
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:231)
>       at 
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:83)
>       ... 13 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 16777216 bytes, only 9961487 bytes are remaining. This usually 
> indicates that you are requesting more memory than you have reserved. 
> However, when running an old JVM version it can also be caused by slow 
> garbage collection. Try to upgrade to Java 8u72 or higher if running on an 
> old Java version.
>       at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:164)
>       at 
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:80)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229)
>       ... 14 more
> {code}
> It seems that this is caused by relying on GC to release managed memory, as 
> {{System.gc()}} may not trigger GC in time. See {{UnsafeMemoryBudget.java}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to