[ 
https://issues.apache.org/jira/browse/FLINK-2763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14933239#comment-14933239
 ] 

Stephan Ewen commented on FLINK-2763:
-------------------------------------

If the join tries to spill a partition with only one buffer, then all 
partitions have only one buffer. All other buffers must be already used in the 
hash table and overflow segments.

I initially assumed that such a situation could never occur (the way segments 
are distributed), but I think I may have been mistaken there. If this situation 
occurs (all partitions have one buffer), then the only space to free is 
overflow buffers.

So we should actually look for partitions to spill by counting partition 
buffers and overflow buffers. In that case, there should be no lack of buffers 
before one partition has accumulated two buffers.

> Bug in Hybrid Hash Join: Request to spill a partition with less than two 
> buffers.
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-2763
>                 URL: https://issues.apache.org/jira/browse/FLINK-2763
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Runtime
>    Affects Versions: master
>            Reporter: Greg Hogan
>
> The following exception is thrown when running the example triangle listing 
> with an unmodified master build (4cadc3d6).
> {noformat}
> ./bin/flink run 
> ~/flink-examples/flink-java-examples/target/flink-java-examples-0.10-SNAPSHOT-EnumTrianglesOpt.jar
>  ~/rmat/undirected/s19_e8.ssv output
> {noformat}
> The only changes to {{flink-conf.yaml}} are {{taskmanager.numberOfTaskSlots: 
> 8}} and {{parallelism.default: 8}}.
> I have confirmed with input files 
> [s19_e8.ssv|https://drive.google.com/file/d/0B6TrSsnHj2HxR2lnMHR4amdyTnM/view?usp=sharing]
>  (40 MB) and 
> [s20_e8.ssv|https://drive.google.com/file/d/0B6TrSsnHj2HxNi1HbmptU29MTm8/view?usp=sharing]
>  (83 MB). On a second machine only the larger file caused the exception.
> {noformat}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>       at org.apache.flink.client.program.Client.runBlocking(Client.java:407)
>       at org.apache.flink.client.program.Client.runBlocking(Client.java:386)
>       at org.apache.flink.client.program.Client.runBlocking(Client.java:353)
>       at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64)
>       at 
> org.apache.flink.examples.java.graph.EnumTrianglesOpt.main(EnumTrianglesOpt.java:125)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:434)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:350)
>       at org.apache.flink.client.program.Client.runBlocking(Client.java:290)
>       at 
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:425)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to 
> spill a partition with less than two buffers.
>       at 
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:288)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1108)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:934)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:859)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:819)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:517)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:556)
>       at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
>       at 
> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:208)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
>       at 
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to