On Tue, Oct 29, 2019 at 10:04 AM Ryan Skraba <r...@skraba.com> wrote:

> I didn't get a chance to try this out -- it sounds like a bug with the
> SparkRunner, if you've tested it with FlinkRunner and it succeeded.
>
> From your description, it should be reproducible by reading any large
> database table with the SparkRunner where the entire dataset is
> greater than the memory available to a single executor?  Do you have
> any other tips to reproduce?
>

Yes, that is what I do.


> Expecially worrisome is "as past JDBC load job runs fine with 4GB
> heap" -- did this happen with the same volumes of data and a different
> version of Beam?  Or the same version and a pipeline with different
> characteristics? This does sound like a regression, so details would
> help to confirm and track it down!
>

Eh, my english, sorry :) What I meant to say is, that if I provide this
data e.g. via file dump, then whole job runs OK with 4GB executor heap.
Run is about 400 cores for 1 hour, so triple the heap size for all just for
one initial load on one executor is inefficient.
I am not aware about any regression.


>
> All my best, Ryan
>
>
>
>
> On Tue, Oct 29, 2019 at 9:48 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >
> > I can not find anything in docs about expected behavior of DoFn emitting
> arbitrary large number elements on one processElement().
> >
> > I wonder if Spark Runner behavior is a bug or just a difference (and
> disadvantage in this case) in execution more towards runner capability
> matrix differences.
> >
> > Also, in such cases, what is an opinion about BoundedSource vs DoFn as a
> source. What is a recommendation to IO developer if one want's to achieve
> equivalent execution scalability across runners?
> >
> >
> > On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>
> >> typo in my previous message. I meant to say => JDBC is `not` the main
> data set, just metadata
> >>
> >> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>>
> >>> Result of my query can fit the memory if I use 12GB heap per spark
> executor. This makes the job quite inefficient as past JDBC load job runs
> fine with 4GB heap to do the main heavy lifting - JDBC is the main data
> set, just metadata.
> >>>
> >>> I just did run the same JdbcIO read code on Spark and Flink runner.
> Flink did not blow up on memory. So it seems like this is a limitation of
> SparkRunner.
> >>>
> >>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba <r...@skraba.com> wrote:
> >>>>
> >>>> One more thing to try -- depending on your pipeline, you can disable
> >>>> the "auto-reshuffle" of JdbcIO.Read by setting
> >>>> withOutputParallelization(false)
> >>>>
> >>>> This is particularly useful if (1) you do aggressive and cheap
> >>>> filtering immediately after the read or (2) you do your own
> >>>> repartitioning action like GroupByKey after the read.
> >>>>
> >>>> Given your investigation into the heap, I doubt this will help!  I'll
> >>>> take a closer look at the DoFnOutputManager.  In the meantime, is
> >>>> there anything particularly about your job that might help
> >>>> investigate?
> >>>>
> >>>> All my best, Ryan
> >>>>
> >>>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >>>> >
> >>>> > I agree I might be too quick to call DoFn output need to fit in
> memory. Actually I am not sure what Beam model say on this matter and what
> output managers of particular runners do about it.
> >>>> >
> >>>> > But SparkRunner definitely has an issue here. I did try set small
> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
> All fails on OOM.
> >>>> > When looking at the heap, most of it is used by linked list
> multi-map of DoFnOutputManager here:
> >>>> >
> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
> >>>> >
> >>>> >
>

Reply via email to