Neville Li created BEAM-3234: -------------------------------- Summary: PubsubIO batch size should be configurable Key: BEAM-3234 URL: https://issues.apache.org/jira/browse/BEAM-3234 Project: Beam Issue Type: Bug Components: sdk-java-gcp Affects Versions: 2.1.0 Reporter: Neville Li Priority: Minor
Looks like there's a payload size limit in Pubsub, and PubsubIO has a hard coded batch size that may cause this limit to be exceeded in some cases. {{import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.TypeDescriptor; public class Test { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(MapElements .into(TypeDescriptor.of(String.class)) .via(x -> { StringBuilder b = new StringBuilder(); for (int i = 0; i < 10000000; i++) { b.append("x"); } return b.toString(); })) .apply(PubsubIO .writeStrings() .to("projects/scio-playground/topics/payload-test")); pipeline.run().waitUntilFinish(); } } }} The above code throws the following error: {{ [error] Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request [error] { [error] "code" : 400, [error] "errors" : [ { [error] "domain" : "global", [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "reason" : "badRequest" [error] } ], [error] "message" : "Request payload size exceeds the limit: 10485760 bytes.", [error] "status" : "INVALID_ARGUMENT" }} -- This message was sent by Atlassian JIRA (v6.4.14#64029)