[
https://issues.apache.org/jira/browse/BEAM-12279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexey Romanenko updated BEAM-12279:
------------------------------------
Description:
Destination dependent sharding feature is very much needed in order to maintain
manageable files sizes and file counts in google storage especially when data
volumes are very large.
Current implementation doesn't allow that (per documentation )
[https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html]
_*Note that currently sharding can not be destination-dependent: every
window/pane for every destination will use the same number of shards specified
via
[{{FileIO.Write.withNumShards(int)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-]
or
[{{FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>,
org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_
**We use it as follows and end up with either very small or very large files
per destination in the same window. Large files are not possible to
open/process and small files clutter the bucket.
{code}
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() + " Window",
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
.apply(FileIO.<String,PubsubMessage>writeDynamic()
.by(new datePartition(options.getOutputFilenamePrefix(),
options.getTimestampName()))
.via(Contextful.fn(
(SerializableFunction<PubsubMessage, String>) inputMsg -> new
String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
TextIO.sink())
.withDestinationCoder(StringUtf8Coder.of())
.to(options.getOutputDirectory())
.withNaming(type -> new CrowdStrikeFileNaming(type))
.withNumShards(options.getNumShards())
.withTempDirectory(options.getTempLocation()));
pipeline.run();
{code}
was:
Destination dependent sharding feature is very much needed in order to maintain
manageable files sizes and file counts in google storage especially when data
volumes are very large.
Current implementation doesn't allow that (per documentation )
[https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html]
_*Note that currently sharding can not be destination-dependent: every
window/pane for every destination will use the same number of shards specified
via
[{{FileIO.Write.withNumShards(int)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-]
or
[{{FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>,
org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_
**We use it as follows and end up with either very small or very large files
per destination in the same window. Large files are not possible to
open/process and small files clutter the bucket.
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply(options.getWindowDuration() + " Window",
Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
.apply(FileIO.<String,PubsubMessage>writeDynamic()
.by(new datePartition(options.getOutputFilenamePrefix(),
options.getTimestampName()))
.via(Contextful.fn(
(SerializableFunction<PubsubMessage, String>) inputMsg -> new
String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
TextIO.sink())
.withDestinationCoder(StringUtf8Coder.of())
.to(options.getOutputDirectory())
.withNaming(type -> new CrowdStrikeFileNaming(type))
.withNumShards(options.getNumShards())
.withTempDirectory(options.getTempLocation()));
pipeline.run();
> Implement destination-dependent sharding in FileIO.writeDynamic
> ---------------------------------------------------------------
>
> Key: BEAM-12279
> URL: https://issues.apache.org/jira/browse/BEAM-12279
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files, sdk-java-core
> Affects Versions: 2.28.0
> Reporter: Inessa Yakubov
> Priority: P1
>
> Destination dependent sharding feature is very much needed in order to
> maintain manageable files sizes and file counts in google storage especially
> when data volumes are very large.
> Current implementation doesn't allow that (per documentation )
> [https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.html]
>
> _*Note that currently sharding can not be destination-dependent: every
> window/pane for every destination will use the same number of shards
> specified via
> [{{FileIO.Write.withNumShards(int)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withNumShards-int-]
> or
> [{{FileIO.Write.withSharding(org.apache.beam.sdk.transforms.PTransform<org.apache.beam.sdk.values.PCollection<UserT>,
>
> org.apache.beam.sdk.values.PCollectionView<java.lang.Integer>>)}}|https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/FileIO.Write.html#withSharding-org.apache.beam.sdk.transforms.PTransform-].*_
>
> **We use it as follows and end up with either very small or very large files
> per destination in the same window. Large files are not possible to
> open/process and small files clutter the bucket.
> {code}
> Pipeline pipeline = Pipeline.create(options);
> pipeline.apply("Read PubSub Events",
> PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
> .apply(options.getWindowDuration() + " Window",
>
> Window.<PubsubMessage>into(FixedWindows.of(parseDuration(options.getWindowDuration())))
> .triggering(AfterWatermark.pastEndOfWindow())
> .discardingFiredPanes()
>
> .withAllowedLateness(parseDuration("24h")).triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow())))
> .apply(FileIO.<String,PubsubMessage>writeDynamic()
> .by(new datePartition(options.getOutputFilenamePrefix(),
> options.getTimestampName()))
> .via(Contextful.fn(
> (SerializableFunction<PubsubMessage, String>) inputMsg -> new
> String(inputMsg.getPayload(), StandardCharsets.UTF_8)),
> TextIO.sink())
> .withDestinationCoder(StringUtf8Coder.of())
> .to(options.getOutputDirectory())
> .withNaming(type -> new CrowdStrikeFileNaming(type))
> .withNumShards(options.getNumShards())
> .withTempDirectory(options.getTempLocation()));
> pipeline.run();
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)