MR for review for this change is here: https://github.com/apache/beam/pull/15051
On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > I would like this thread to stay focused on sharding FileIO only. Possible > change to the model is an interesting topic but of a much different scope. > > Yes, I agree that sharding is mostly a physical rather than logical > property of the pipeline. That is why it feels more natural to distinguish > between those two on the API level. > As for handling sharding requirements by adding more sugar to dynamic > destinations + file naming one has to keep in mind that results of dynamic > writes can be observed in the form of KV<DestinationT, String>, so written > files per dynamic destination. Often we do GBP to post-process files per > destination / logical group. If sharding would be encoded there, then it > might complicate things either downstream or inside the sugar part to put > shard in and then take it out later. > From the user perspective I do not see much difference. We would still > need to allow API to define both behaviors and it would only be executed > differently by implementation. > I do not see a value in changing FileIO (WriteFiles) logic to stop using > sharding and use dynamic destination for both given that sharding function > is already there and in use. > > To the point of external shuffle and non-deterministic user input. > Yes users can create non-deterministic behaviors but they are in control. > Here, Beam internally adds non-deterministic behavior and users can not > opt-out. > All works fine as long as external shuffle service (depends on Runner) > holds to the data and hands it out on retries. However if data in shuffle > service is lost for some reason - e.g. disk failure, node breaks down - > then pipeline have to recover the data by recomputing necessary paths. > > On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <rober...@google.com> > wrote: > >> Sharding is typically a physical rather than logical property of the >> pipeline, and I'm not convinced it makes sense to add it to Beam in >> general. One can already use keys and GBK/Stateful DoFns if some kind >> of logical grouping is needed, and adding constraints like this can >> prevent opportunities for optimizations (like dynamic sharding and >> fusion). >> >> That being said, file output are one area where it could make sense. I >> would expect that dynamic destinations could cover this usecase, and a >> general FileNaming subclass could be provided to make this pattern >> easier (and possibly some syntactic sugar for auto-setting num shards >> to 0). (One downside of this approach is that one couldn't do dynamic >> destinations, and have each sharded with a distinct sharing function >> as well.) >> >> If this doesn't work, we could look into adding ShardingFunction as a >> publicly exposed parameter to FileIO. (I'm actually surprised it >> already exists.) >> >> On Thu, Jun 17, 2021 at 9:39 AM <je...@seznam.cz> wrote: >> > >> > Alright, but what is worth emphasizing is that we talk about batch >> workloads. The typical scenario is that the output is committed once the >> job finishes (e.g., by atomic rename of directory). >> > Jan >> > >> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <re...@google.com>: >> > >> > Yes - the problem is that Beam makes no guarantees of determinism >> anywhere in the system. User DoFns might be non deterministic, and have no >> way to know (we've discussed proving users with an @IsDeterministic >> annotation, however empirically users often think their functions are >> deterministic when they are in fact not). _Any_ sort of triggered >> aggregation (including watermark based) can always be non deterministic. >> > >> > Even if everything was deterministic, replaying everything is tricky. >> The output files already exist - should the system delete them and recreate >> them, or leave them as is and delete the temp files? Either decision could >> be problematic. >> > >> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <je...@seznam.cz> wrote: >> > >> > Correct, by the external shuffle service I pretty much meant >> "offloading the contents of a shuffle phase out of the system". Looks like >> that is what the Spark's checkpoint does as well. On the other hand (if I >> understand the concept correctly), that implies some performance penalty - >> the data has to be moved to external distributed filesystem. Which then >> feels weird if we optimize code to avoid computing random numbers, but are >> okay with moving complete datasets back and forth. I think in this >> particular case making the Pipeline deterministic - idempotent to be >> precise - (within the limits, yes, hashCode of enum is not stable between >> JVMs) would seem more practical to me. >> > >> > Jan >> > >> > On 6/17/21 7:09 AM, Reuven Lax wrote: >> > >> > I have some thoughts here, as Eugene Kirpichov and I spent a lot of >> time working through these semantics in the past. >> > >> > First - about the problem of duplicates: >> > >> > A "deterministic" sharding - e.g. hashCode based (though Java makes no >> guarantee that hashCode is stable across JVM instances, so this technique >> ends up not being stable) doesn't really help matters in Beam. Unlike other >> systems, Beam makes no assumptions that transforms are idempotent or >> deterministic. What's more, _any_ transform that has any sort of triggered >> grouping (whether the trigger used is watermark based or otherwise) is non >> deterministic. >> > >> > Forcing a hash of every element imposed quite a CPU cost; even >> generating a random number per-element slowed things down too much, which >> is why the current code generates a random number only in startBundle. >> > >> > Any runner that does not implement RequiresStableInput will not >> properly execute FileIO. Dataflow and Flink both support this. I believe >> that the Spark runner implicitly supports it by manually calling checkpoint >> as Ken mentioned (unless someone removed that from the Spark runner, but if >> so that would be a correctness regression). Implementing this has nothing >> to do with external shuffle services - neither Flink, Spark, nor Dataflow >> appliance (classic shuffle) have any problem correctly implementing >> RequiresStableInput. >> > >> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <je...@seznam.cz> wrote: >> > >> > I think that the support for @RequiresStableInput is rather limited. >> AFAIK it is supported by streaming Flink (where it is not needed in this >> situation) and by Dataflow. Batch runners without external shuffle service >> that works as in the case of Dataflow have IMO no way to implement it >> correctly. In the case of FileIO (which do not use @RequiresStableInput as >> it would not be supported on Spark) the randomness is easily avoidable >> (hashCode of key?) and I it seems to me it should be preferred. >> > >> > Jan >> > >> > On 6/16/21 6:23 PM, Kenneth Knowles wrote: >> > >> > >> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz> wrote: >> > >> > Hi, >> > >> > maybe a little unrelated, but I think we definitely should not use >> random assignment of shard keys (RandomShardingFunction), at least for >> bounded workloads (seems to be fine for streaming workloads). Many batch >> runners simply recompute path in the computation DAG from the failed node >> (transform) to the root (source). In the case there is any non-determinism >> involved in the logic, then it can result in duplicates (as the 'previous' >> attempt might have ended in DAG path that was not affected by the fail). >> That addresses the option 2) of what Jozef have mentioned. >> > >> > This is the reason we introduced "@RequiresStableInput". >> > >> > When things were only Dataflow, we knew that each shuffle was a >> checkpoint, so we inserted a Reshuffle after the random numbers were >> generated, freezing them so it was safe for replay. Since other engines do >> not checkpoint at every shuffle, we needed a way to communicate that this >> checkpointing was required for correctness. I think we still have many IOs >> that are written using Reshuffle instead of @RequiresStableInput, and I >> don't know which runners process @RequiresStableInput properly. >> > >> > By the way, I believe the SparkRunner explicitly calls materialize() >> after a GBK specifically so that it gets correct results for IOs that rely >> on Reshuffle. Has that changed? >> > >> > I agree that we should minimize use of RequiresStableInput. It has a >> significant cost, and the cost varies across runners. If we can use a >> deterministic function, we should. >> > >> > Kenn >> > >> > >> > Jan >> > >> > On 6/16/21 1:43 PM, Jozef Vilcek wrote: >> > >> > >> > >> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org> >> wrote: >> > >> > In general, Beam only deals with keys and grouping by key. I think >> expanding this idea to some more abstract notion of a sharding function >> could make sense. >> > >> > For FileIO specifically, I wonder if you can use writeDynamic() to get >> the behavior you are seeking. >> > >> > >> > The change in mind looks like this: >> > >> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18 >> > >> > Dynamic Destinations in my mind is more towards the need for >> "partitioning" data (destination as directory level) or if one needs to >> handle groups of events differently, e.g. write some events in FormatA and >> others in FormatB. >> > Shards are now used for distributing writes or bucketing of events >> within a particular destination group. More specifically, currently, each >> element is assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded >> key is a compound of destination and assigned shard. >> > >> > Having said that, I might be able to use dynamic destination for this, >> possibly with the need of custom FileNaming, and set shards to be always 1. >> But it feels less natural than allowing the user to swap already present >> `RandomShardingFunction` [2] with something of his own choosing. >> > >> > >> > [1] >> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java >> > [2] >> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856 >> > >> > Kenn >> > >> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com> >> wrote: >> > >> > Adding sharding to the model may require a wider discussion than FileIO >> alone. I'm not entirely sure how wide, or if this has been proposed before, >> but IMO it warrants a design doc or proposal. >> > >> > A couple high level questions I can think of are, >> > - What runners support sharding? >> > * There will be some work in Dataflow required to support this >> but I'm not sure how much. >> > - What does sharding mean for streaming pipelines? >> > >> > A more nitty-detail question: >> > - How can this be achieved performantly? For example, if a shuffle is >> required to achieve a particular sharding constraint, should we allow >> transforms to declare they don't modify the sharding property (e.g. key >> preserving) which may allow a runner to avoid an additional shuffle if a >> preceding shuffle can guarantee the sharding requirements? >> > >> > Where X is the shuffle that could be avoided: input -> shuffle (key >> sharding fn A) -> transform1 (key preserving) -> transform 2 (key >> preserving) -> X -> fileio (key sharding fn A) >> > >> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com> >> wrote: >> > >> > I would like to extend FileIO with possibility to specify a custom >> sharding function: >> > https://issues.apache.org/jira/browse/BEAM-12493 >> > >> > I have 2 use-cases for this: >> > >> > I need to generate shards which are compatible with Hive bucketing and >> therefore need to decide shard assignment based on data fields of input >> element >> > When running e.g. on Spark and job encounters kind of failure which >> cause a loss of some data from previous stages, Spark does issue recompute >> of necessary task in necessary stages to recover data. Because the shard >> assignment function is random as default, some data will end up in >> different shards and cause duplicates in the final output. >> > >> > Please let me know your thoughts in case you see a reason to not to add >> such improvement. >> > >> > Thanks, >> > Jozef >> > >> > >> >