Hi,

I have a PCollection of objects that I want to write to disk. I want to
shard the objects; each shard in the output should contain roughly 80 MB of
data.

I have written the code to write the objects to disk, but I cannot find an
efficient way to control shard size. I have tried some options:

1. Call a ParDo directly on the PCollection and 'batch' by using
start_bundle() and finish_bundle(). This does not work well because you
cannot control the size of each bundle.
2. I tried using the BatchElements
<https://beam.apache.org/documentation/patterns/batch-elements> primitive,
but it seems that it only supports 'batch across bundles' in the streaming
setting, using `max_batch_duration_secs`. This also transforms the
PCollection into a windowed PCollection, which does not work well with
downstream operations.
3. Using GroupIntoBatches or something similar is also not doable, because
you can only specify the number of elements, not the batch size in
megabytes. And since this required an int input, not a PValue, I cannot
precompute and pass an 'average batch size'.
4. WriteToFiles and FileBasedSink are not the right abstractions because
they don't expose any way to control shard size. Also it might not work
well with my actual data format [0]

So what is the right way to do this? I could do something like this:
1. Compute the average bytes per element and determine the total number of
shards
2. Assign key random.randint(0, num_shards) to each element
3. Group by key
4. Create a shard from the list[elements] in input

However this involves writing a lot of custom logic, a GroupByKey (which
effectively doubles the data we pass around and isn't particularly
efficient), etc.

Is there a better way to achieve this that I am missing?

I would appreciate any help. Thank you!

[0] My data format is a MosaicML shard. You write like this:

with MDSWrite(...) as writer:
  for record in records:
    writer.write(record)

The output contains 1 or more `.mds` binary files and one `index.json` file
which gives you information about the size and location of each shard. I
don't think this plays well with the `FileBasedSink`, does it?

Best,
Antonio

Reply via email to