damccorm opened a new issue, #20952:
URL: https://github.com/apache/beam/issues/20952

   Destination dependent sharding feature is very much needed in order to 
maintain manageable files sizes and file counts in google storage especially 
when data volumes are very large.
   
   Current implementation doesn't allow that (per documentation ) 
[https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html](https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html)
   
    
   
   _*Note that currently sharding can not be destination-dependent: every 
window/pane for every destination will use the same number of shards specified 
via 
[`FileIO.Write.withNumShards(int)`|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-]
 or 
[`FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>,
 
org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)`|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_
   
    
   
   **We use it as follows and end up with either very small or very large files 
per destination in the same window. Large files are not possible to 
open/process and small files clutter the bucket.
   
   ```
   
   Pipeline pipeline = Pipeline.create(options);
    pipeline.apply("Read PubSub Events", 
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
   
   .apply(options.getWindowDuration() + " Window",
    
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
   
   .triggering(AfterWatermark.pastEndOfWindow()) 
    .discardingFiredPanes()
    
.withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
   
   .apply(FileIO.<String,PubsubMessage>writeDynamic()
   
   .by(new datePartition(options.getOutputFilenamePrefix(), 
options.getTimestampName()))
    .via(Contextful.fn(
   
   (SerializableFunction<PubsubMessage, String>) inputMsg -> new 
String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
   
   TextIO.sink())
    .withDestinationCoder(StringUtf8Coder.of())
    .to(options.getOutputDirectory())
    .withNaming(type
   -> new CrowdStrikeFileNaming(type))
    .withNumShards(options.getNumShards())
    .withTempDirectory(options.getTempLocation()));
   
   pipeline.run();
   
   ```
     
   
   Imported from Jira 
[BEAM-12279](https://issues.apache.org/jira/browse/BEAM-12279). Original Jira 
may contain additional context.
   Reported by: [email protected].


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to