I would like this thread to stay focused on sharding FileIO only. Possible
change to the model is an interesting topic but of a much different scope.

Yes, I agree that sharding is mostly a physical rather than logical
property of the pipeline. That is why it feels more natural to distinguish
between those two on the API level.
As for handling sharding requirements by adding more sugar to dynamic
destinations + file naming one has to keep in mind that results of dynamic
writes can be observed in the form of KV<DestinationT, String>, so written
files per dynamic destination. Often we do GBP to post-process files per
destination / logical group. If sharding would be encoded there, then it
might complicate things either downstream or inside the sugar part to put
shard in and then take it out later.
>From the user perspective I do not see much difference. We would still need
to allow API to define both behaviors and it would only be executed
differently by implementation.
I do not see a value in changing FileIO (WriteFiles) logic to stop using
sharding and use dynamic destination for both given that sharding function
is already there and in use.

To the point of external shuffle and non-deterministic user input.
Yes users can create non-deterministic behaviors but they are in control.
Here, Beam internally adds non-deterministic behavior and users can not
opt-out.
All works fine as long as external shuffle service (depends on Runner)
holds to the data and hands it out on retries. However if data in shuffle
service is lost for some reason - e.g. disk failure, node breaks down -
then pipeline have to recover the data by recomputing necessary paths.

On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <rober...@google.com> wrote:

> Sharding is typically a physical rather than logical property of the
> pipeline, and I'm not convinced it makes sense to add it to Beam in
> general. One can already use keys and GBK/Stateful DoFns if some kind
> of logical grouping is needed, and adding constraints like this can
> prevent opportunities for optimizations (like dynamic sharding and
> fusion).
>
> That being said, file output are one area where it could make sense. I
> would expect that dynamic destinations could cover this usecase, and a
> general FileNaming subclass could be provided to make this pattern
> easier (and possibly some syntactic sugar for auto-setting num shards
> to 0). (One downside of this approach is that one couldn't do dynamic
> destinations, and have each sharded with a distinct sharing function
> as well.)
>
> If this doesn't work, we could look into adding ShardingFunction as a
> publicly exposed parameter to FileIO. (I'm actually surprised it
> already exists.)
>
> On Thu, Jun 17, 2021 at 9:39 AM <je...@seznam.cz> wrote:
> >
> > Alright, but what is worth emphasizing is that we talk about batch
> workloads. The typical scenario is that the output is committed once the
> job finishes (e.g., by atomic rename of directory).
> >  Jan
> >
> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <re...@google.com>:
> >
> > Yes - the problem is that Beam makes no guarantees of determinism
> anywhere in the system. User DoFns might be non deterministic, and have no
> way to know (we've discussed proving users with an @IsDeterministic
> annotation, however empirically users often think their functions are
> deterministic when they are in fact not). _Any_ sort of triggered
> aggregation (including watermark based) can always be non deterministic.
> >
> > Even if everything was deterministic, replaying everything is tricky.
> The output files already exist - should the system delete them and recreate
> them, or leave them as is and delete the temp files? Either decision could
> be problematic.
> >
> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > Correct, by the external shuffle service I pretty much meant "offloading
> the contents of a shuffle phase out of the system". Looks like that is what
> the Spark's checkpoint does as well. On the other hand (if I understand the
> concept correctly), that implies some performance penalty - the data has to
> be moved to external distributed filesystem. Which then feels weird if we
> optimize code to avoid computing random numbers, but are okay with moving
> complete datasets back and forth. I think in this particular case making
> the Pipeline deterministic - idempotent to be precise - (within the limits,
> yes, hashCode of enum is not stable between JVMs) would seem more practical
> to me.
> >
> >  Jan
> >
> > On 6/17/21 7:09 AM, Reuven Lax wrote:
> >
> > I have some thoughts here, as Eugene Kirpichov and I spent a lot of time
> working through these semantics in the past.
> >
> > First - about the problem of duplicates:
> >
> > A "deterministic" sharding - e.g. hashCode based (though Java makes no
> guarantee that hashCode is stable across JVM instances, so this technique
> ends up not being stable) doesn't really help matters in Beam. Unlike other
> systems, Beam makes no assumptions that transforms are idempotent or
> deterministic. What's more, _any_ transform that has any sort of triggered
> grouping (whether the trigger used is watermark based or otherwise) is non
> deterministic.
> >
> > Forcing a hash of every element imposed quite a CPU cost; even
> generating a random number per-element slowed things down too much, which
> is why the current code generates a random number only in startBundle.
> >
> > Any runner that does not implement RequiresStableInput will not properly
> execute FileIO. Dataflow and Flink both support this. I believe that the
> Spark runner implicitly supports it by manually calling checkpoint as Ken
> mentioned (unless someone removed that from the Spark runner, but if so
> that would be a correctness regression). Implementing this has nothing to
> do with external shuffle services - neither Flink, Spark, nor Dataflow
> appliance (classic shuffle) have any problem correctly implementing
> RequiresStableInput.
> >
> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > I think that the support for @RequiresStableInput is rather limited.
> AFAIK it is supported by streaming Flink (where it is not needed in this
> situation) and by Dataflow. Batch runners without external shuffle service
> that works as in the case of Dataflow have IMO no way to implement it
> correctly. In the case of FileIO (which do not use @RequiresStableInput as
> it would not be supported on Spark) the randomness is easily avoidable
> (hashCode of key?) and I it seems to me it should be preferred.
> >
> >  Jan
> >
> > On 6/16/21 6:23 PM, Kenneth Knowles wrote:
> >
> >
> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz> wrote:
> >
> > Hi,
> >
> > maybe a little unrelated, but I think we definitely should not use
> random assignment of shard keys (RandomShardingFunction), at least for
> bounded workloads (seems to be fine for streaming workloads). Many batch
> runners simply recompute path in the computation DAG from the failed node
> (transform) to the root (source). In the case there is any non-determinism
> involved in the logic, then it can result in duplicates (as the 'previous'
> attempt might have ended in DAG path that was not affected by the fail).
> That addresses the option 2) of what Jozef have mentioned.
> >
> > This is the reason we introduced "@RequiresStableInput".
> >
> > When things were only Dataflow, we knew that each shuffle was a
> checkpoint, so we inserted a Reshuffle after the random numbers were
> generated, freezing them so it was safe for replay. Since other engines do
> not checkpoint at every shuffle, we needed a way to communicate that this
> checkpointing was required for correctness. I think we still have many IOs
> that are written using Reshuffle instead of @RequiresStableInput, and I
> don't know which runners process @RequiresStableInput properly.
> >
> > By the way, I believe the SparkRunner explicitly calls materialize()
> after a GBK specifically so that it gets correct results for IOs that rely
> on Reshuffle. Has that changed?
> >
> > I agree that we should minimize use of RequiresStableInput. It has a
> significant cost, and the cost varies across runners. If we can use a
> deterministic function, we should.
> >
> > Kenn
> >
> >
> >  Jan
> >
> > On 6/16/21 1:43 PM, Jozef Vilcek wrote:
> >
> >
> >
> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org> wrote:
> >
> > In general, Beam only deals with keys and grouping by key. I think
> expanding this idea to some more abstract notion of a sharding function
> could make sense.
> >
> > For FileIO specifically, I wonder if you can use writeDynamic() to get
> the behavior you are seeking.
> >
> >
> > The change in mind looks like this:
> >
> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
> >
> > Dynamic Destinations in my mind is more towards the need for
> "partitioning" data (destination as directory level) or if one needs to
> handle groups of events differently, e.g. write some events in FormatA and
> others in FormatB.
> > Shards are now used for distributing writes or bucketing of events
> within a particular destination group. More specifically, currently, each
> element is assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded
> key is a compound of destination and assigned shard.
> >
> > Having said that, I might be able to use dynamic destination for this,
> possibly with the need of custom FileNaming, and set shards to be always 1.
> But it feels less natural than allowing the user to swap already present
> `RandomShardingFunction` [2] with something of his own choosing.
> >
> >
> > [1]
> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
> > [2]
> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
> >
> > Kenn
> >
> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com>
> wrote:
> >
> > Adding sharding to the model may require a wider discussion than FileIO
> alone. I'm not entirely sure how wide, or if this has been proposed before,
> but IMO it warrants a design doc or proposal.
> >
> > A couple high level questions I can think of are,
> >   - What runners support sharding?
> >       * There will be some work in Dataflow required to support this but
> I'm not sure how much.
> >   - What does sharding mean for streaming pipelines?
> >
> > A more nitty-detail question:
> >   - How can this be achieved performantly? For example, if a shuffle is
> required to achieve a particular sharding constraint, should we allow
> transforms to declare they don't modify the sharding property (e.g. key
> preserving) which may allow a runner to avoid an additional shuffle if a
> preceding shuffle can guarantee the sharding requirements?
> >
> > Where X is the shuffle that could be avoided: input -> shuffle (key
> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
> preserving) -> X -> fileio (key sharding fn A)
> >
> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
> >
> > I would like to extend FileIO with possibility to specify a custom
> sharding function:
> > https://issues.apache.org/jira/browse/BEAM-12493
> >
> > I have 2 use-cases for this:
> >
> > I need to generate shards which are compatible with Hive bucketing and
> therefore need to decide shard assignment based on data fields of input
> element
> > When running e.g. on Spark and job encounters kind of failure which
> cause a loss of some data from previous stages, Spark does issue recompute
> of necessary task in necessary stages to recover data. Because the shard
> assignment function is random as default, some data will end up in
> different shards and cause duplicates in the final output.
> >
> > Please let me know your thoughts in case you see a reason to not to add
> such improvement.
> >
> > Thanks,
> > Jozef
> >
> >
>

Reply via email to