kennknowles opened a new issue, #19159:
URL: https://github.com/apache/beam/issues/19159
I've got a streaming Dataflow job which streams data from a Pub/Sub
subscription to a single BigQuery table that I'm experimenting with moving to
batch loads via BigQueryIO.Method.FILE_LOADS, but the only way I can get the
job to successfully run is by increasing worker memory from 15GB to 52GB, which
seems like a lot.
I haven't been able to get a heap dump, but observing the job I can see ~5GB
of records accumulate in GroupByDestination before the trigger duration elapses
and WriteGroupedRecords processes those, at which point I see OOM errors in
WriteGroupedRecords:
`Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.OutOfMemoryError: Java heap space``
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)``
org.apache.beam.sdk.io.gcp.bigquery.WriteGroupedRecordsToFiles$DoFnInvoker.invokeProcessElement(Unknown
Source)`
Like I said, I can resolve this by running the job with n1-highmem-8
machines, but this seems odd. The job is explicitly sharding data to keep
per-worker requirements low but there's still a per-worker bottleneck about the
size of the entire dataset. Increasing numFileShards doesn't seem to affect
this, either — increasing from 100 to 1,000 to 10,000 changed the number of
files but not the OOMs.
The pipeline is fairly standard, but here's the code edited for
confidentiality:
```
pipeline
.apply("Read",
PubsubIO.readMessages().fromSubscription(subscription))
.apply("Transform", ParDo.of(new MtoNFunction()))
.apply(
"Write",
BigQueryIO.<TableRow>write()
.withFormatFunction(a -> a)
.to(tableRef)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(5))
.withNumFileShards(100_000)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withJsonSchema("redacted")
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(opts.getGcpTempLocation())));
```
Imported from Jira
[BEAM-5694](https://issues.apache.org/jira/browse/BEAM-5694). Original Jira may
contain additional context.
Reported by: mc-coda.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]