kennknowles opened a new issue, #19159:
URL: https://github.com/apache/beam/issues/19159

   I've got a streaming Dataflow job which streams data from a Pub/Sub 
subscription to a single BigQuery table that I'm experimenting with moving to 
batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the 
job to successfully run is by increasing worker memory from 15GB to 52GB, which 
seems like a lot.
   
   I haven't been able to get a heap dump, but observing the job I can see ~5GB 
of records accumulate in GroupByDestination before the trigger duration elapses 
and WriteGroupedRecords processes those, at which point I see OOM errors in 
WriteGroupedRecords:
   
   `Caused by: org.apache.beam.sdk.util.UserCodeException: 
java.lang.OutOfMemoryError: Java heap space``        
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)``    
    
org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown
 Source)`
   
   Like I said, I can resolve this by running the job with n1-highmem-8 
machines, but this seems odd. The job is explicitly sharding data to keep 
per-worker requirements low but there's still a per-worker bottleneck about the 
size of the entire dataset. Increasing numFileShards doesn't seem to affect 
this, either — increasing from 100 to 1,000 to 10,000 changed the number of 
files but not the OOMs.
   
   The pipeline is fairly standard, but here's the code edited for 
confidentiality:
   
   ```
   
       pipeline
           .apply("Read", 
PubsubIO.readMessages().fromSubscription(subscription))
       
      .apply("Transform", ParDo.of(new MtoNFunction()))
           .apply(
               "Write",
         
        BigQueryIO.<TableRow>write()
                   .withFormatFunction(a -> a)
                   .to(tableRef)
   
                  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                   .withTriggeringFrequency(Duration.standardMinutes(5))
   
                  .withNumFileShards(100_000)
                   
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
   
                  
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
              
       .withJsonSchema("redacted")
                   
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));
   
   ```
   
   
   
   Imported from Jira 
[BEAM-5694](https://issues.apache.org/jira/browse/BEAM-5694). Original Jira may 
contain additional context.
   Reported by: mc-coda.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to