On Tue, Oct 29, 2019 at 10:04 AM Ryan Skraba <r...@skraba.com> wrote:
> I didn't get a chance to try this out -- it sounds like a bug with the > SparkRunner, if you've tested it with FlinkRunner and it succeeded. > > From your description, it should be reproducible by reading any large > database table with the SparkRunner where the entire dataset is > greater than the memory available to a single executor? Do you have > any other tips to reproduce? > Yes, that is what I do. > Expecially worrisome is "as past JDBC load job runs fine with 4GB > heap" -- did this happen with the same volumes of data and a different > version of Beam? Or the same version and a pipeline with different > characteristics? This does sound like a regression, so details would > help to confirm and track it down! > Eh, my english, sorry :) What I meant to say is, that if I provide this data e.g. via file dump, then whole job runs OK with 4GB executor heap. Run is about 400 cores for 1 hour, so triple the heap size for all just for one initial load on one executor is inefficient. I am not aware about any regression. > > All my best, Ryan > > > > > On Tue, Oct 29, 2019 at 9:48 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > > > > I can not find anything in docs about expected behavior of DoFn emitting > arbitrary large number elements on one processElement(). > > > > I wonder if Spark Runner behavior is a bug or just a difference (and > disadvantage in this case) in execution more towards runner capability > matrix differences. > > > > Also, in such cases, what is an opinion about BoundedSource vs DoFn as a > source. What is a recommendation to IO developer if one want's to achieve > equivalent execution scalability across runners? > > > > > > On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> > >> typo in my previous message. I meant to say => JDBC is `not` the main > data set, just metadata > >> > >> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >>> > >>> Result of my query can fit the memory if I use 12GB heap per spark > executor. This makes the job quite inefficient as past JDBC load job runs > fine with 4GB heap to do the main heavy lifting - JDBC is the main data > set, just metadata. > >>> > >>> I just did run the same JdbcIO read code on Spark and Flink runner. > Flink did not blow up on memory. So it seems like this is a limitation of > SparkRunner. > >>> > >>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba <r...@skraba.com> wrote: > >>>> > >>>> One more thing to try -- depending on your pipeline, you can disable > >>>> the "auto-reshuffle" of JdbcIO.Read by setting > >>>> withOutputParallelization(false) > >>>> > >>>> This is particularly useful if (1) you do aggressive and cheap > >>>> filtering immediately after the read or (2) you do your own > >>>> repartitioning action like GroupByKey after the read. > >>>> > >>>> Given your investigation into the heap, I doubt this will help! I'll > >>>> take a closer look at the DoFnOutputManager. In the meantime, is > >>>> there anything particularly about your job that might help > >>>> investigate? > >>>> > >>>> All my best, Ryan > >>>> > >>>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >>>> > > >>>> > I agree I might be too quick to call DoFn output need to fit in > memory. Actually I am not sure what Beam model say on this matter and what > output managers of particular runners do about it. > >>>> > > >>>> > But SparkRunner definitely has an issue here. I did try set small > `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK. > All fails on OOM. > >>>> > When looking at the heap, most of it is used by linked list > multi-map of DoFnOutputManager here: > >>>> > > https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234 > >>>> > > >>>> > >