It seems that the stack overflow error in Spark is mostly caused by the 'scheduler' in the master trying to recursively visit all of the RDDs that belong to an iterative job. I think the error I've bumped into when trying to run ALS on Spark was also caused by this. On the other hand, I suspect the length of RDDs pipelined into each task would be much smaller than that. Also a quick search tells me that the Java call stack is thousands of levels deep by default, which makes it even more unlikely that our Task will run into this error.
So as the first implementation goal, it may be better to keep the recursive strcture, and just do a bit of refactoring to handle the other issues. I'd be happy to work on those. We may handle stack overflow errors once they really become a problem for our users. John On Sun, Jun 3, 2018, 1:43 PM John Yang <[email protected]> wrote: > Hi Nemo devs, > > > > I’d like to get your feedback on some ideas I have regarding TaskExecutor > (formerly TaskGroupExecutor). > > > At the moment TaskExecutor executes the IRVertex DAG of a task like the > following pseudocode. The ‘rootVertex’ here is either a source vertex, or a > vertex that fetches data from a parent stage. > > > prepareDataStructuresForIRVertexDataHandlers(irVertexDAG) > > for each rootVertex { > > for each rootVertex’s incoming element { > > recursivelyExecute(childIRVertexDataHandler(rootVertex), element} > > } > > } > > > While this code has served us well so far, I feel there’s room for > improvements. > > > Most importantly, this code runs into stack overflow errors when handling > a long chain of vertices, which appear to be used quite frequently in user > applications. A quick search for “stackoverflowerror spark” on Google > returns quite a few bug reports from users trying to run long RDD lineages > on Spark, which also employs a recursive execution model. It’d be nice if > Nemo avoids this error. > > > Another issue I see is that we are consuming all of a root vertex's data > before moving onto the next root vertex. I feel this is unfair to root > vertices that are chosen later. In case of stream-joining two active data > streams, this effectively results in consuming only one of the streams, and > never looking at the other data stream. It'd be better if we give a fair > chance to every root vertex. > > > Finally, if we're to do away with recursion, it'd be nice to refactor > IRVertexDataHandler > and related data structures a bit. With recursion, the data structures > are used to recursively reference children vertices. Without recursion, we > may want to loop through the vertices in a topological order, and reference > OutputCollectors of each vertex's parents to consume data, if data exist. > Of course, all per-element overheads remain small with quick pointer > referencing, and boolean operations. > > > So, a sketch of what I want to do is something like this: > > > (rootHandlers, nonRootHandlers) = topoSortAndSetUpHandlers(irVertexDAG) > > while (allDone) { > > // Fair chance across roots > > for rootHandler { > > readElement(rootHandler) > > } > > > // Trickle down the element(s) to the leaf nodes in a topological order > > while (nonRootsDoneForTheseRootElements) { > > for nonRootHandler { > > execute(nonRootHandler) > > } > > } > > } > > > Here, we can also wrap the root vertices using a handler with an > OutputCollector. This makes things consistent, and also sets things up to > handle the case of a streaming join where only one of the data stream is > active, as we can simply choose not to add an element to the > OutputCollector for the inactive stream, without getting blocking on it. > (note that we would also need to change the iterator logic a bit to make > this work) > > > Do you think it will work? Looking forward to hear your thoughts. :) > > > > Thanks, > > John > > >
