[ https://issues.apache.org/jira/browse/FLINK-1930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14511203#comment-14511203 ]
ASF GitHub Bot commented on FLINK-1930: --------------------------------------- GitHub user uce opened a pull request: https://github.com/apache/flink/pull/624 [FLINK-1930] Separate output buffer pool and result partition life cycle The problem: when a pipelined result is only consumed partially, the buffer pool associated with the result partition will be destroyed too early. If there is a pipelined producer online, which is still producing data for this partition, it will run into an IllegalStateException. The solution: by separating the life-cycle of the result partition and the associated buffer pool this cannot happen anymore. The result buffer pool is only destroyed after the producing task is finished, which is independent of the state of the result partition. Furthermore, this commit squashes the following commits: - [FLINK-1930] [tests] Add test for FLINK-1930 - [tests] Move iterative tests to correct package Thanks to @vasia for reporting the original issue and the program and data to reproduce the problem. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/incubator-flink tooearly1930 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/624.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #624 ---- commit 6b7f14471ef34a18d284a42bfe5fc53bbf756585 Author: Ufuk Celebi <u...@apache.org> Date: 2015-04-24T14:58:27Z [FLINK-1930] Separate output buffer pool and result partition life cycle The problem: when a pipelined result is only consumed partially, the buffer pool associated with the result partition will be destroyed too early. If there is a pipelined producer online, which is still producing data for this partition, it will run into an IllegalStateException. The solution: by separating the life-cycle of the result partition and the associated buffer pool this cannot happen anymore. The result buffer pool is only destroyed after the producing task is finished, which is independent of the state of the result partition. Furthermore, this commit squashes the following commits: - [FLINK-1930] [tests] Add test for FLINK-1930 - [tests] Move iterative tests to correct package ---- > NullPointerException in vertex-centric iteration > ------------------------------------------------ > > Key: FLINK-1930 > URL: https://issues.apache.org/jira/browse/FLINK-1930 > Project: Flink > Issue Type: Bug > Affects Versions: 0.9 > Reporter: Vasia Kalavri > Assignee: Ufuk Celebi > > Hello to my Squirrels, > I came across this exception when having a vertex-centric iteration output > followed by a group by. > I'm not sure if what is causing it, since I saw this error in a rather large > pipeline, but I managed to reproduce it with [this code example | > https://github.com/vasia/flink/commit/1b7bbca1a6130fbcfe98b4b9b43967eb4c61f309] > and a sufficiently large dataset, e.g. [this one | > http://snap.stanford.edu/data/com-DBLP.html] (I'm running this locally). > It seems like a null Buffer in RecordWriter. > The exception message is the following: > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:319) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.ActorLogMessages$anon$1.apply(ActorLogMessages.scala:37) > at > org.apache.flink.runtime.ActorLogMessages$anon$1.apply(ActorLogMessages.scala:30) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.ActorLogMessages$anon$1.applyOrElse(ActorLogMessages.scala:30) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NullPointerException > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.setNextBuffer(SpanningRecordSerializer.java:93) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.iterative.task.IterationHeadPactTask.streamSolutionSetToFinalOutput(IterationHeadPactTask.java:405) > at > org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:365) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:221) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)