[ 
https://issues.apache.org/jira/browse/BEAM-1841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949552#comment-15949552
 ] 

Eugene Kirpichov commented on BEAM-1841:
----------------------------------------

Dataflow runner kinda sorta partially addresses this, but not enough. Example 
redacted log message:

Splitting source StaticValueProvider{value=(redacted)} into bundles of 
estimated size 1 bytes produced 100 bundles, which have total serialized size 
533398549 bytes. As this is too large for the Google Cloud Dataflow API, 
retrying splitting once with increased desiredBundleSizeBytes 50 to reduce the 
number of splits.

I.e. it reduces the number of bundles only enough to fit in the Dataflow API 
limit, but in fact a bundle size of 50 bytes is still nowhere near appropriate 
in this case.

> FileBasedSource should better handle when set of files grows while job is 
> running
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-1841
>                 URL: https://issues.apache.org/jira/browse/BEAM-1841
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>
> In some cases people run pipelines over directories where the set of files in 
> the directory grows while the job runs. This may lead to a situation like 
> this, in particular with Dataflow runner:
> At job submission time, the FileBasedSource estimates the current size of the 
> filepattern, and ends up with a small number. Dataflow runner chooses thus a 
> small desiredBundleSizeBytes to pass to .splitIntoBundles(). However, at the 
> time splitIntoBundles() runs, the set of files has greatly grown, and we 
> produce many more, unnecessarily small bundles, than anticipated.
> I see a few things we could do:
> - In splitIntoBundles(), compute the actual size and detect when the desired 
> size is unreasonably small for it; e.g. set an upper threshold on how many 
> bundles we produce in total.
> - Somehow remember, at submission time, what was the estimated size. Then, in 
> splitIntoBundles(), compute the actual current size, and scale 
> desiredBundleSizeBytes accordingly to get approximately the intended number 
> of bundles. Caveat: files may still change between the moment size is 
> estimated and the moment splitting happens.
> - (much larger in scope) Change the whole protocol to use number of bundles 
> instead of bundle size bytes. This probably won't happen with BoundedSource, 
> but it is going to be the case with Splittable DoFn.
> Option 1 seems by far the simplest.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to