[
https://issues.apache.org/jira/browse/BEAM-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17509482#comment-17509482
]
Beam JIRA Bot commented on BEAM-12279:
--------------------------------------
This issue is P2 but has been unassigned without any comment for 60 days so it
has been labeled "stale-P2". If this issue is still affecting you, we care!
Please comment and remove the label. Otherwise, in 14 days the issue will be
moved to P3.
Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed
explanation of what these priorities mean.
> Implement destination-dependent sharding in FileIO.writeDynamic
> ---------------------------------------------------------------
>
> Key: BEAM-12279
> URL: https://issues.apache.org/jira/browse/BEAM-12279
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files, sdk-java-core
> Affects Versions: 2.28.0
> Reporter: Inessa Yakubov
> Priority: P2
> Labels: stale-P2
>
> Destination dependent sharding feature is very much needed in order to
> maintain manageable files sizes and file counts in google storage especially
> when data volumes are very large.
> Current implementation doesn't allow that (per documentation )
> [https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html]
>
> _*Note that currently sharding can not be destination-dependent: every
> window/pane for every destination will use the same number of shards
> specified via
> [{{FileIO.Write.withNumShards(int)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-]
> or
> [{{FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>,
>
> org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_
>
> **We use it as follows and end up with either very small or very large files
> per destination in the same window. Large files are not possible to
> open/process and small files clutter the bucket.
> {code}
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply("Read PubSub Events",
> PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
> .apply(options.getWindowDuration() + " Window",
>
> Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
> .triggering(AfterWatermark.pastEndOfWindow())
> .discardingFiredPanes()
>
> .withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
> .apply(FileIO.<String,PubsubMessage>writeDynamic()
> .by(new datePartition(options.getOutputFilenamePrefix(),
> options.getTimestampName()))
> .via(Contextful.fn(
> (SerializableFunction<PubsubMessage, String>) inputMsg -> new
> String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
> TextIO.sink())
> .withDestinationCoder(StringUtf8Coder.of())
> .to(options.getOutputDirectory())
> .withNaming(type -> new CrowdStrikeFileNaming(type))
> .withNumShards(options.getNumShards())
> .withTempDirectory(options.getTempLocation()));
> pipeline.run();
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)