[ 
https://issues.apache.org/jira/browse/BEAM-5694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16739588#comment-16739588
 ] 

Chamikara Jayalath commented on BEAM-5694:
------------------------------------------

R: [~reuvenlax]

> OOMs on Pub/Sub to BigQuery via FILE_LOADS
> ------------------------------------------
>
>                 Key: BEAM-5694
>                 URL: https://issues.apache.org/jira/browse/BEAM-5694
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>    Affects Versions: 2.7.0
>         Environment: Google Dataflow
>            Reporter: Coda Hale
>            Assignee: Chamikara Jayalath
>            Priority: Major
>
> I've got a streaming Dataflow job which streams data from a Pub/Sub 
> subscription to a single BigQuery table that I'm experimenting with moving to 
> batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the 
> job to successfully run is by increasing worker memory from 15GB to 52GB, 
> which seems like a lot.
> I haven't been able to get a heap dump, but observing the job I can see ~5GB 
> of records accumulate in GroupByDestination before the trigger duration 
> elapses and WriteGroupedRecords processes those, at which point I see OOM 
> errors in WriteGroupedRecords:
> {{Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.lang.OutOfMemoryError: Java heap space}}{{        
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)}}{{
>         
> org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown
>  Source)}}
> Like I said, I can resolve this by running the job with n1-highmem-8 
> machines, but this seems odd. The job is explicitly sharding data to keep 
> per-worker requirements low but there's still a per-worker bottleneck about 
> the size of the entire dataset. Increasing numFileShards doesn't seem to 
> affect this, either — increasing from 100 to 1,000 to 10,000 changed the 
> number of files but not the OOMs.
> The pipeline is fairly standard, but here's the code edited for 
> confidentiality:
> {code:java}
>     pipeline
>         .apply("Read", PubsubIO.readMessages().fromSubscription(subscription))
>         .apply("Transform", ParDo.of(new MtoNFunction()))
>         .apply(
>             "Write",
>             BigQueryIO.<TableRow>write()
>                 .withFormatFunction(a -> a)
>                 .to(tableRef)
>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>                 .withTriggeringFrequency(Duration.standardMinutes(5))
>                 .withNumFileShards(100_000)
>                 
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>                 
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>                 .withJsonSchema("redacted")
>                 
> .withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to