Hi,
maybe a little unrelated, but I think we definitely should not use
random assignment of shard keys (RandomShardingFunction), at least for
bounded workloads (seems to be fine for streaming workloads). Many batch
runners simply recompute path in the computation DAG from the failed
node (transform) to the root (source). In the case there is any
non-determinism involved in the logic, then it can result in duplicates
(as the 'previous' attempt might have ended in DAG path that was not
affected by the fail). That addresses the option 2) of what Jozef have
mentioned.
Jan
On 6/16/21 1:43 PM, Jozef Vilcek wrote:
On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
In general, Beam only deals with keys and grouping by key. I think
expanding this idea to some more abstract notion of a sharding
function could make sense.
For FileIO specifically, I wonder if you can use writeDynamic() to
get the behavior you are seeking.
The change in mind looks like this:
https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
<https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18>
Dynamic Destinations in my mind is more towards the need for
"partitioning" data (destination as directory level) or if one needs
to handle groups of events differently, e.g. write some events in
FormatA and others in FormatB.
Shards are now used for distributing writes or bucketing of events
within a particular destination group. More specifically, currently,
each element is assigned `ShardedKey<Integer>` [1] before GBK
operation. Sharded key is a compound of destination and assigned shard.
Having said that, I might be able to use dynamic destination for this,
possibly with the need of custom FileNaming, and set shards to be
always 1. But it feels less natural than allowing the user to swap
already present `RandomShardingFunction` [2] with something of his own
choosing.
[1]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
<https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java>
[2]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
<https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856>
Kenn
On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com
<mailto:tyso...@google.com>> wrote:
Adding sharding to the model may require a wider discussion
than FileIO alone. I'm not entirely sure how wide, or if this
has been proposed before, but IMO it warrants a design doc or
proposal.
A couple high level questions I can think of are,
- What runners support sharding?
* There will be some work in Dataflow required to
support this but I'm not sure how much.
- What does sharding mean for streaming pipelines?
A more nitty-detail question:
- How can this be achieved performantly? For example, if a
shuffle is required to achieve a particular sharding
constraint, should we allow transforms to declare they don't
modify the sharding property (e.g. key preserving) which may
allow a runner to avoid an additional shuffle if a preceding
shuffle can guarantee the sharding requirements?
Where X is the shuffle that could be avoided: input -> shuffle
(key sharding fn A) -> transform1 (key preserving) ->
transform 2 (key preserving) -> X -> fileio (key sharding fn A)
On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek
<jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
I would like to extend FileIO with possibility to specify
a custom sharding function:
https://issues.apache.org/jira/browse/BEAM-12493
<https://issues.apache.org/jira/browse/BEAM-12493>
I have 2 use-cases for this:
1. I need to generate shards which are compatible with
Hive bucketing and therefore need to decide shard
assignment based on data fields of input element
2. When running e.g. on Spark and job encounters kind of
failure which cause a loss of some data from previous
stages, Spark does issue recompute of necessary task
in necessary stages to recover data. Because the shard
assignment function is random as default, some data
will end up in different shards and cause duplicates
in the final output.
Please let me know your thoughts in case you see a reason
to not to add such improvement.
Thanks,
Jozef