[ 
https://issues.apache.org/jira/browse/BEAM-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516548#comment-17516548
 ] 

Beam JIRA Bot commented on BEAM-12279:
--------------------------------------

This issue was marked "stale-P2" and has not received a public comment in 14 
days. It is now automatically moved to P3. If you are still affected by it, you 
can comment and move it back to P2.

> Implement destination-dependent sharding in FileIO.writeDynamic
> ---------------------------------------------------------------
>
>                 Key: BEAM-12279
>                 URL: https://issues.apache.org/jira/browse/BEAM-12279
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files, sdk-java-core
>    Affects Versions: 2.28.0
>            Reporter: Inessa Yakubov
>            Priority: P3
>
> Destination dependent sharding feature is very much needed in order to 
> maintain manageable files sizes and file counts in google storage especially 
> when data volumes are very large.
> Current implementation doesn't allow that (per documentation ) 
> [https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html]
>  
> _*Note that currently sharding can not be destination-dependent: every 
> window/pane for every destination will use the same number of shards 
> specified via 
> [{{FileIO.Write.withNumShards(int)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-]
>  or 
> [{{FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>,
>  
> org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_
>  
> **We use it as follows and end up with either very small or very large files 
> per destination in the same window. Large files are not possible to 
> open/process and small files clutter the bucket.
> {code}
> Pipeline pipeline = Pipeline.create(options);
>  pipeline.apply("Read PubSub Events", 
> PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
>  .apply(options.getWindowDuration() + " Window",
>  
> Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
>  .triggering(AfterWatermark.pastEndOfWindow()) 
>  .discardingFiredPanes()
>  
> .withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
> .apply(FileIO.<String,PubsubMessage>writeDynamic()
>  .by(new datePartition(options.getOutputFilenamePrefix(), 
> options.getTimestampName()))
>  .via(Contextful.fn(
>  (SerializableFunction<PubsubMessage, String>) inputMsg -> new 
> String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
>  TextIO.sink())
>  .withDestinationCoder(StringUtf8Coder.of())
>  .to(options.getOutputDirectory())
>  .withNaming(type -> new CrowdStrikeFileNaming(type))
>  .withNumShards(options.getNumShards())
>  .withTempDirectory(options.getTempLocation()));
> pipeline.run();
> {code}  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to