[
https://issues.apache.org/jira/browse/FLINK-20663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316264#comment-17316264
]
zhou commented on FLINK-20663:
------------------------------
here is script (the data set amount is small ) :
/space/airflow/dags/chloe # /space/flink/bin/flink run -m yarn-cluster -yjm
26096 -ytm 46789 -ynm collocation --pyFiles /space/airflow/dags/ -py
/space/airflow/dags/chloe/material_collocation_summary.py --date=2021-04-01
--env=pro
{quote}
File "/space/airflow/dags/chloe/material_collocation_summary.py", line 181, in
<module>
main(date)
File "/space/airflow/dags/chloe/material_collocation_summary.py", line 129, in
main
result.execute().print()
File "/space/flink/opt/python/pyflink.zip/pyflink/table/table_result.py", line
219, in print
File "/space/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
File "/space/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
147, in deco
File "/space/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line
328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o1428.print.
: java.lang.RuntimeException: Failed to fetch next result
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
at
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
at
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to fetch job execution result
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
... 16 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID:
ef1a60be8f725a192a72b12cbcc2769c)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
... 18 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: ef1a60be8f725a192a72b12cbcc2769c)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$22(RestClusterClient.java:665)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException:
org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate
512 pages
at
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:84)
at
org.apache.flink.table.runtime.hashtable.BaseHybridHashTable.getNextBuffer(BaseHybridHashTable.java:254)
at
org.apache.flink.table.runtime.hashtable.BaseHybridHashTable.nextSegment(BaseHybridHashTable.java:313)
at
org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:166)
at
org.apache.flink.table.runtime.hashtable.LongHashPartition.<init>(LongHashPartition.java:136)
at
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.createPartitions(LongHybridHashTable.java:276)
at
org.apache.flink.table.runtime.hashtable.LongHybridHashTable.<init>(LongHybridHashTable.java:89)
at LongHashJoinOperator$893$LongHashTable$877.<init>(Unknown Source)
at LongHashJoinOperator$893.open(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not
allocate 512 pages
at
org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:235)
at
org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:82)
... 16 more
Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could
not allocate 16777216 bytes, only 0 bytes are remaining. This usually indicates
that you are requesting more memory than you have reserved. However, when
running an old JVM version it can also be caused by slow garbage collection.
Try to upgrade to Java 8u72 or higher if running on an old Java version.
at
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:170)
at
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:84)
at
org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:232)
... 17 more
org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to
access closed classloader. Please check if you store classloaders directly or
indirectly in static fields. If the stacktrace suggests that the leak occurs in
a third party library and cannot be fixed immediately, you can disable this
check with the configuration 'classloader.check-leaked-classloader'.
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2803)
at
org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3059)
at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3018)
at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2991)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2871)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1223)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1835)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
at
org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at
org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at
org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
{quote}
> Managed memory may not be released in time when operators use managed memory
> frequently
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-20663
> URL: https://issues.apache.org/jira/browse/FLINK-20663
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 1.12.0
> Reporter: Caizhi Weng
> Assignee: Xintong Song
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.12.2, 1.13.0
>
>
> Some batch operators (like sort merge join or hash aggregate) use managed
> memory frequently. When these operators are chained together and the cluster
> load is a bit heavy, it is very likely that the following exception occurs:
> {code:java}
> 2020-12-18 10:04:32
> java.lang.RuntimeException:
> org.apache.flink.runtime.memory.MemoryAllocationException: Could not allocate
> 512 pages
> at
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:85)
> at
> org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:49)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap$RecordArea.<init>(BytesHashMap.java:297)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:103)
> at
> org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.<init>(BytesHashMap.java:90)
> at LocalHashAggregateWithKeys$209161.open(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:401)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:506)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:834)
> Suppressed: java.lang.NullPointerException
> at LocalHashAggregateWithKeys$209161.close(Unknown Source)
> at
> org.apache.flink.table.runtime.operators.TableStreamOperator.dispose(TableStreamOperator.java:46)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:739)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:719)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:642)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:551)
> ... 3 more
> Suppressed: java.lang.NullPointerException
> at LocalHashAggregateWithKeys$209766.close(Unknown
> Source)
> ... 8 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could
> not allocate 512 pages
> at
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:231)
> at
> org.apache.flink.table.runtime.util.LazyMemorySegmentPool.nextSegment(LazyMemorySegmentPool.java:83)
> ... 13 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could
> not allocate 16777216 bytes, only 9961487 bytes are remaining. This usually
> indicates that you are requesting more memory than you have reserved.
> However, when running an old JVM version it can also be caused by slow
> garbage collection. Try to upgrade to Java 8u72 or higher if running on an
> old Java version.
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:164)
> at
> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:80)
> at
> org.apache.flink.runtime.memory.MemoryManager.allocatePages(MemoryManager.java:229)
> ... 14 more
> {code}
> It seems that this is caused by relying on GC to release managed memory, as
> {{System.gc()}} may not trigger GC in time. See {{UnsafeMemoryBudget.java}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)