Beam team,
I’m currently making use of the Beam Go SDK to construct a pipeline for
creating and inserting large text data files into GCS.
The pipeline is relatively simple; prepare a bracket of source data, transform
it into fleshed out data sets, write to GCS with the textio package.
func main() {
flag.Parse()
ctx := context.Background()
beam.Init()
p := beam.NewPipeline()
s := p.Root()
// Get a PCollection of account numbers
accNumCol := beam.CreateList(s, makeRange(*accountStart, *accountEnd))
// Transform the PCollection of account numbers into a collection of strings
that each represent a group of account data
accountCol := beam.ParDo(s, accountNumToAccountWithRows, accNumCol)
// Write all account blocks to a single file in GCS
textio.Write(s, fmt.Sprintf("gs://files/CC%d-ACC%d.txt", *accountStart,
*accountEnd), accountCol)
if err := beamx.Run(context.Background(), p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
Currently, for smaller files (e.g, 13MB or 30,000 lines of text) we are not
experiencing any issues with the pipeline when executed in Dataflow. When
preparing larger datasets that would generate medium sized files of approx.
130MB, executing the write to GCS step in the Dataflow pipeline fails
consistently with the following kind of errors:
Error message from worker: process bundle failed for instruction
process_bundle-3 using plan process-bundle-descriptor-47 : while executing
Process for Plan[process-bundle-descriptor-47]:
2: ParDo[textio.writeFileFn] Out:[]
1: DataSource[S[ptransform-46@localhost:12371], 0]
Coder:W;coder-63<CoGBK;coder-64<int[varintz;c2];coder-65,string;coder-67[string]>>!GWC
Out:2
caused by:
source failed
caused by:
rpc error: code = ResourceExhausted desc = grpc: received message larger than
max (104858536 vs. 52428800)
&
"jsonPayload": {
"worker": "go-job-1-1599375664191334-09060001-zizk-harness-ptm6",
"job": "2020-09-06_00_01_25-2750019264422346896",
"work": "process_bundle-1",
"message": "DataChannel.read localhost:12371 bad: rpc error: code =
ResourceExhausted desc = grpc: received message larger than max (104938332 vs.
52428800)",
"logger":
<project>/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/datamgr.go:261",
"portability_worker_id": "1"
}
When executing the pipeline locally, without integration with GCS i.e. writing
to example.txt instead of gs://example.txt no issues are apparent and files of
arbitrary size can be generated. Running the pipeline locally with an
integration to GCS also seems to work with files of any size, thought the
upload can be quite slow. It is only when executing the pipeline on Dataflow
that writing the files to GCS fail, it seems odd that we would encounter issues
uploading files of relatively medium sizes to GCS, with such a simple pipeline.
The errors seem to indicate that something is receiving a message size larger
than 52MB (52428800), but I am unable to determine what, and where this magic
number is coming from.
I would appreciate any insight into this issue with our pipeline’s integration
with GCS, anyone seen this before?
Regards,
Patrick Mitchell
"This e-mail and any attachments to it (the "Communication") is, unless
otherwise stated, confidential, may contain copyright material and is for the
use only of the intended recipient. If you receive the Communication in error,
please notify the sender immediately by return e-mail, delete the Communication
and the return e-mail, and do not read, copy, retransmit or otherwise deal with
it. Any views expressed in the Communication are those of the individual sender
only, unless expressly stated to be those of Australia and New Zealand Banking
Group Limited ABN 11 005 357 522, or any of its related entities including ANZ
Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in
connection with the integrity of or errors in the Communication, computer
virus, data corruption, interference or delay arising from or in respect of the
Communication."