On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax <re...@google.com> wrote: > > > On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > >> I don't think this has anything to do with external shuffle services. >> >> Arbitrarily recomputing data is fundamentally incompatible with Beam, >> since Beam does not restrict transforms to being deterministic. The Spark >> runner works (at least it did last I checked) by checkpointing the RDD. >> Spark will not recompute the DAG past a checkpoint, so this creates stable >> input to a transform. This adds some cost to the Spark runner, but makes >> things correct. You should not have sharding problems due to replay unless >> there is a bug in the current Spark runner. >> >> Beam does not restrict non-determinism, true. If users do add it, then >> they can work around it should they need to address any side effects. But >> should non-determinism be deliberately added by Beam core? Probably can if >> runners can 100% deal with that effectively. To the point of RDD >> `checkpoint`, afaik SparkRunner does use `cache`, not checkpoint. Also, I >> thought cache is invoked on fork points in DAG. I have just a join and map >> and in such cases I thought data is always served out by shuffle service. >> Am I mistaken? >> > > Non determinism is already there in core Beam features. If any sort of > trigger is used anywhere in the pipeline, the result is non deterministic. > We've also found that users tend not to know when their ParDos are non > deterministic, so telling users to works around it tends not to work. > > The Spark runner definitely used to use checkpoint in this case. >
It is beyond my knowledge level of Beam how triggers introduce non-determinism into the code processing in batch mode. So I leave that out. Reuven, do you mind pointing me to the Spark runner code which is supposed to handle this by using checkpoint? I can not find it myself. > >> >> On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax <re...@google.com> wrote: >> >>> >>> >>> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> >>>> How will @RequiresStableInput prevent this situation when running batch >>>>> use case? >>>>> >>>> >>>> So this is handled in combination of @RequiresStableInput and output >>>> file finalization. @RequiresStableInput (or Reshuffle for most runners) >>>> makes sure that the input provided for the write stage does not get >>>> recomputed in the presence of failures. Output finalization makes sure that >>>> we only finalize one run of each bundle and discard the rest. >>>> >>>> So this I believe relies on having robust external service to hold >>>> shuffle data and serve it out when needed so pipeline does not need to >>>> recompute it via non-deterministic function. In Spark however, shuffle >>>> service can not be (if I am not mistaking) deployed in this fashion (HA + >>>> replicated shuffle data). Therefore, if instance of shuffle service holding >>>> a portion of shuffle data fails, spark recovers it by recomputing parts of >>>> a DAG from source to recover lost shuffle results. I am not sure what Beam >>>> can do here to prevent it or make it stable? Will @RequiresStableInput work >>>> as expected? ... Note that this, I believe, concerns only the batch. >>>> >>> >>> I don't think this has anything to do with external shuffle services. >>> >>> Arbitrarily recomputing data is fundamentally incompatible with Beam, >>> since Beam does not restrict transforms to being deterministic. The Spark >>> runner works (at least it did last I checked) by checkpointing the RDD. >>> Spark will not recompute the DAG past a checkpoint, so this creates stable >>> input to a transform. This adds some cost to the Spark runner, but makes >>> things correct. You should not have sharding problems due to replay unless >>> there is a bug in the current Spark runner. >>> >>> >>>> >>>> Also, when withNumShards() truly have to be used, round robin >>>> assignment of elements to shards sounds like the optimal solution (at least >>>> for the vast majority of pipelines) >>>> >>>> I agree. But random seed is my problem here with respect to the >>>> situation mentioned above. >>>> >>>> Right, I would like to know if there are more true use-cases before >>>> adding this. This essentially allows users to map elements to exact output >>>> shards which could change the characteristics of pipelines in very >>>> significant ways without users being aware of it. For example, this could >>>> result in extremely imbalanced workloads for any downstream processors. If >>>> it's just Hive I would rather work around it (even with a perf penalty for >>>> that case). >>>> >>>> User would have to explicitly ask FileIO to use specific sharding. >>>> Documentation can educate about the tradeoffs. But I am open to workaround >>>> alternatives. >>>> >>>> >>>> On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> >>>>> >>>>> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Cham, thanks for the feedback >>>>>> >>>>>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow >>>>>> runners to optimize better. I think one concern might be that the >>>>>> addition >>>>>> of this option might be going against this. >>>>>> >>>>>> I agree that less knobs is more. But if assignment of a key is >>>>>> specific per user request via API (optional), than runner should not >>>>>> optimize that but need to respect how data is requested to be laid down. >>>>>> It >>>>>> could however optimize number of shards as it is doing right now if >>>>>> numShards is not set explicitly. >>>>>> Anyway FileIO feels fluid here. You can leave shards empty and let >>>>>> runner decide or optimize, but only in batch and not in streaming. Then >>>>>> it allows you to set number of shards but never let you change the logic >>>>>> how it is assigned and defaults to round robin with random seed. It begs >>>>>> question for me why I can manipulate number of shards but not the logic >>>>>> of >>>>>> assignment. Are there plans to (or already case implemented) optimize >>>>>> around and have runner changing that under different circumstances? >>>>>> >>>>> >>>>> I don't know the exact history behind withNumShards() feature for >>>>> batch. I would guess it was originally introduced due to limitations for >>>>> streaming but was made available to batch as well with a strong >>>>> performance >>>>> warning. >>>>> >>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L159 >>>>> >>>>> Also, when withNumShards() truly have to be used, round robin >>>>> assignment of elements to shards sounds like the optimal solution (at >>>>> least >>>>> for the vast majority of pipelines) >>>>> >>>>> >>>>>> >>>>>> > Also looking at your original reasoning for adding this. >>>>>> >>>>>> > > I need to generate shards which are compatible with Hive >>>>>> bucketing and therefore need to decide shard assignment based on data >>>>>> fields of input element >>>>>> >>>>>> > This sounds very specific to Hive. Again I think the downside here >>>>>> is that we will be giving up the flexibility for the runner to optimize >>>>>> the >>>>>> pipeline and decide sharding. Do you think it's possible to somehow relax >>>>>> this requirement for > > > Hive or use a secondary process to format the >>>>>> output to a form that is suitable for Hive after being written to an >>>>>> intermediate storage. >>>>>> >>>>>> It is possible. But it is quite suboptimal because of extra IO >>>>>> penalty and I would have to use completely custom file writing as FileIO >>>>>> would not support this. Then naturally the question would be, should I >>>>>> use >>>>>> FileIO at all? I am quite not sure if I am doing something so rare that >>>>>> it >>>>>> is not and can not be supported. Maybe I am. I remember an old discussion >>>>>> from Scio guys when they wanted to introduce Sorted Merge Buckets to >>>>>> FileIO >>>>>> where sharding was also needed to be manipulated (among other things) >>>>>> >>>>> >>>>> Right, I would like to know if there are more true use-cases before >>>>> adding this. This essentially allows users to map elements to exact output >>>>> shards which could change the characteristics of pipelines in very >>>>> significant ways without users being aware of it. For example, this could >>>>> result in extremely imbalanced workloads for any downstream processors. If >>>>> it's just Hive I would rather work around it (even with a perf penalty for >>>>> that case). >>>>> >>>>> >>>>>> >>>>>> > > When running e.g. on Spark and job encounters kind of failure >>>>>> which cause a loss of some data from previous stages, Spark does issue >>>>>> recompute of necessary task in necessary stages to recover data. Because >>>>>> the shard assignment function is random as default, some data will end up >>>>>> in different shards and cause duplicates in the final output. >>>>>> >>>>>> > I think this was already addressed. The correct solution here is to >>>>>> implement RequiresStableInput for runners that do not already support >>>>>> that >>>>>> and update FileIO to use that. >>>>>> >>>>>> How will @RequiresStableInput prevent this situation when running >>>>>> batch use case? >>>>>> >>>>> >>>>> So this is handled in combination of @RequiresStableInput and output >>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners) >>>>> makes sure that the input provided for the write stage does not get >>>>> recomputed in the presence of failures. Output finalization makes sure >>>>> that >>>>> we only finalize one run of each bundle and discard the rest. >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> >>>>>> >>>>>> >>>>>> On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath < >>>>>> chamik...@google.com> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> how do we proceed with reviewing MR proposed for this change? >>>>>>>> >>>>>>>> I sense there is a concern exposing existing sharding function to >>>>>>>> the API. But from the discussion here I do not have a clear picture >>>>>>>> about arguments not doing so. >>>>>>>> Only one argument was that dynamic destinations should be able to >>>>>>>> do the same. While this is true, as it is illustrated in previous >>>>>>>> commnet, it is not simple nor convenient to use and requires more >>>>>>>> customization than exposing sharding which is already there. >>>>>>>> Are there more negatives to exposing sharding function? >>>>>>>> >>>>>>> >>>>>>> Seems like currently FlinkStreamingPipelineTranslator is the only >>>>>>> real usage of WriteFiles.withShardingFunction() : >>>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L281 >>>>>>> WriteFiles is not expected to be used by pipeline authors but >>>>>>> exposing this through the FileIO will make this available to pipeline >>>>>>> authors (and some may choose to do so for various reasons). >>>>>>> This will result in sharding being strict and runners being >>>>>>> inflexible when it comes to optimizing execution of pipelines that use >>>>>>> FileIO. >>>>>>> >>>>>>> Beam has a policy of no knobs (or keeping knobs minimum) to allow >>>>>>> runners to optimize better. I think one concern might be that the >>>>>>> addition >>>>>>> of this option might be going against this. >>>>>>> >>>>>>> Also looking at your original reasoning for adding this. >>>>>>> >>>>>>> > I need to generate shards which are compatible with Hive bucketing >>>>>>> and therefore need to decide shard assignment based on data fields of >>>>>>> input >>>>>>> element >>>>>>> >>>>>>> This sounds very specific to Hive. Again I think the downside here >>>>>>> is that we will be giving up the flexibility for the runner to optimize >>>>>>> the >>>>>>> pipeline and decide sharding. Do you think it's possible to somehow >>>>>>> relax >>>>>>> this requirement for Hive or use a secondary process to format the >>>>>>> output >>>>>>> to a form that is suitable for Hive after being written to an >>>>>>> intermediate >>>>>>> storage. >>>>>>> >>>>>>> > When running e.g. on Spark and job encounters kind of failure >>>>>>> which cause a loss of some data from previous stages, Spark does issue >>>>>>> recompute of necessary task in necessary stages to recover data. Because >>>>>>> the shard assignment function is random as default, some data will end >>>>>>> up >>>>>>> in different shards and cause duplicates in the final output. >>>>>>> >>>>>>> I think this was already addressed. The correct solution here is to >>>>>>> implement RequiresStableInput for runners that do not already support >>>>>>> that >>>>>>> and update FileIO to use that. >>>>>>> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> The difference in my opinion is in distinguishing between - as >>>>>>>>> written in this thread - physical vs logical properties of the >>>>>>>>> pipeline. I >>>>>>>>> proposed to keep dynamic destination (logical) and sharding (physical) >>>>>>>>> separate on API level as it is at implementation level. >>>>>>>>> >>>>>>>>> When I reason about using `by()` for my case ... I am using >>>>>>>>> dynamic destination to partition data into hourly folders. So my >>>>>>>>> destination is e.g. `2021-06-23-07`. To add a shard there I assume I >>>>>>>>> will >>>>>>>>> need >>>>>>>>> * encode shard to the destination .. e.g. in form of file prefix >>>>>>>>> `2021-06-23-07/shard-1` >>>>>>>>> * dynamic destination now does not span a group of files but must >>>>>>>>> be exactly one file >>>>>>>>> * to respect above, I have to. "disable" sharding in WriteFiles >>>>>>>>> and make sure to use `.withNumShards(1)` ... I am not sure what >>>>>>>>> `runnner >>>>>>>>> determined` sharding would do, if it would not split destination >>>>>>>>> further >>>>>>>>> into more files >>>>>>>>> * make sure that FileNameing will respect my intent and name files >>>>>>>>> as I expect based on that destination >>>>>>>>> * post process `WriteFilesResult` to turn my destination which >>>>>>>>> targets physical single file (date-with-hour + shard-num) back into >>>>>>>>> the >>>>>>>>> destination with targets logical group of files (date-with-hour) so I >>>>>>>>> hook >>>>>>>>> it up do downstream post-process as usual >>>>>>>>> >>>>>>>>> Am I roughly correct? Or do I miss something more straight forward? >>>>>>>>> If the above is correct then it feels more fragile and less >>>>>>>>> intuitive to me than the option in my MR. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I'm not sure I understand your PR. How is this PR different than >>>>>>>>>> the by() method in FileIO? >>>>>>>>>> >>>>>>>>>> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek < >>>>>>>>>> jozo.vil...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> MR for review for this change is here: >>>>>>>>>>> https://github.com/apache/beam/pull/15051 >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek < >>>>>>>>>>> jozo.vil...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I would like this thread to stay focused on sharding FileIO >>>>>>>>>>>> only. Possible change to the model is an interesting topic but of >>>>>>>>>>>> a much >>>>>>>>>>>> different scope. >>>>>>>>>>>> >>>>>>>>>>>> Yes, I agree that sharding is mostly a physical rather than >>>>>>>>>>>> logical property of the pipeline. That is why it feels more >>>>>>>>>>>> natural to >>>>>>>>>>>> distinguish between those two on the API level. >>>>>>>>>>>> As for handling sharding requirements by adding more sugar to >>>>>>>>>>>> dynamic destinations + file naming one has to keep in mind that >>>>>>>>>>>> results of >>>>>>>>>>>> dynamic writes can be observed in the form of KV<DestinationT, >>>>>>>>>>>> String>, >>>>>>>>>>>> so written files per dynamic destination. Often we do GBP to >>>>>>>>>>>> post-process >>>>>>>>>>>> files per destination / logical group. If sharding would be >>>>>>>>>>>> encoded there, >>>>>>>>>>>> then it might complicate things either downstream or inside the >>>>>>>>>>>> sugar part >>>>>>>>>>>> to put shard in and then take it out later. >>>>>>>>>>>> From the user perspective I do not see much difference. We >>>>>>>>>>>> would still need to allow API to define both behaviors and it >>>>>>>>>>>> would only be >>>>>>>>>>>> executed differently by implementation. >>>>>>>>>>>> I do not see a value in changing FileIO (WriteFiles) logic to >>>>>>>>>>>> stop using sharding and use dynamic destination for both given that >>>>>>>>>>>> sharding function is already there and in use. >>>>>>>>>>>> >>>>>>>>>>>> To the point of external shuffle and non-deterministic user >>>>>>>>>>>> input. >>>>>>>>>>>> Yes users can create non-deterministic behaviors but they are >>>>>>>>>>>> in control. Here, Beam internally adds non-deterministic behavior >>>>>>>>>>>> and users >>>>>>>>>>>> can not opt-out. >>>>>>>>>>>> All works fine as long as external shuffle service (depends on >>>>>>>>>>>> Runner) holds to the data and hands it out on retries. However if >>>>>>>>>>>> data in >>>>>>>>>>>> shuffle service is lost for some reason - e.g. disk failure, node >>>>>>>>>>>> breaks >>>>>>>>>>>> down - then pipeline have to recover the data by recomputing >>>>>>>>>>>> necessary >>>>>>>>>>>> paths. >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw < >>>>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Sharding is typically a physical rather than logical property >>>>>>>>>>>>> of the >>>>>>>>>>>>> pipeline, and I'm not convinced it makes sense to add it to >>>>>>>>>>>>> Beam in >>>>>>>>>>>>> general. One can already use keys and GBK/Stateful DoFns if >>>>>>>>>>>>> some kind >>>>>>>>>>>>> of logical grouping is needed, and adding constraints like >>>>>>>>>>>>> this can >>>>>>>>>>>>> prevent opportunities for optimizations (like dynamic sharding >>>>>>>>>>>>> and >>>>>>>>>>>>> fusion). >>>>>>>>>>>>> >>>>>>>>>>>>> That being said, file output are one area where it could make >>>>>>>>>>>>> sense. I >>>>>>>>>>>>> would expect that dynamic destinations could cover this >>>>>>>>>>>>> usecase, and a >>>>>>>>>>>>> general FileNaming subclass could be provided to make this >>>>>>>>>>>>> pattern >>>>>>>>>>>>> easier (and possibly some syntactic sugar for auto-setting num >>>>>>>>>>>>> shards >>>>>>>>>>>>> to 0). (One downside of this approach is that one couldn't do >>>>>>>>>>>>> dynamic >>>>>>>>>>>>> destinations, and have each sharded with a distinct sharing >>>>>>>>>>>>> function >>>>>>>>>>>>> as well.) >>>>>>>>>>>>> >>>>>>>>>>>>> If this doesn't work, we could look into adding >>>>>>>>>>>>> ShardingFunction as a >>>>>>>>>>>>> publicly exposed parameter to FileIO. (I'm actually surprised >>>>>>>>>>>>> it >>>>>>>>>>>>> already exists.) >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Jun 17, 2021 at 9:39 AM <je...@seznam.cz> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > Alright, but what is worth emphasizing is that we talk about >>>>>>>>>>>>> batch workloads. The typical scenario is that the output is >>>>>>>>>>>>> committed once >>>>>>>>>>>>> the job finishes (e.g., by atomic rename of directory). >>>>>>>>>>>>> > Jan >>>>>>>>>>>>> > >>>>>>>>>>>>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax < >>>>>>>>>>>>> re...@google.com>: >>>>>>>>>>>>> > >>>>>>>>>>>>> > Yes - the problem is that Beam makes no guarantees of >>>>>>>>>>>>> determinism anywhere in the system. User DoFns might be non >>>>>>>>>>>>> deterministic, >>>>>>>>>>>>> and have no way to know (we've discussed proving users with an >>>>>>>>>>>>> @IsDeterministic annotation, however empirically users often >>>>>>>>>>>>> think their >>>>>>>>>>>>> functions are deterministic when they are in fact not). _Any_ >>>>>>>>>>>>> sort of >>>>>>>>>>>>> triggered aggregation (including watermark based) can always be >>>>>>>>>>>>> non >>>>>>>>>>>>> deterministic. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Even if everything was deterministic, replaying everything >>>>>>>>>>>>> is tricky. The output files already exist - should the system >>>>>>>>>>>>> delete them >>>>>>>>>>>>> and recreate them, or leave them as is and delete the temp files? >>>>>>>>>>>>> Either >>>>>>>>>>>>> decision could be problematic. >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský < >>>>>>>>>>>>> je...@seznam.cz> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > Correct, by the external shuffle service I pretty much meant >>>>>>>>>>>>> "offloading the contents of a shuffle phase out of the system". >>>>>>>>>>>>> Looks like >>>>>>>>>>>>> that is what the Spark's checkpoint does as well. On the other >>>>>>>>>>>>> hand (if I >>>>>>>>>>>>> understand the concept correctly), that implies some performance >>>>>>>>>>>>> penalty - >>>>>>>>>>>>> the data has to be moved to external distributed filesystem. >>>>>>>>>>>>> Which then >>>>>>>>>>>>> feels weird if we optimize code to avoid computing random >>>>>>>>>>>>> numbers, but are >>>>>>>>>>>>> okay with moving complete datasets back and forth. I think in this >>>>>>>>>>>>> particular case making the Pipeline deterministic - idempotent to >>>>>>>>>>>>> be >>>>>>>>>>>>> precise - (within the limits, yes, hashCode of enum is not stable >>>>>>>>>>>>> between >>>>>>>>>>>>> JVMs) would seem more practical to me. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Jan >>>>>>>>>>>>> > >>>>>>>>>>>>> > On 6/17/21 7:09 AM, Reuven Lax wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > I have some thoughts here, as Eugene Kirpichov and I spent a >>>>>>>>>>>>> lot of time working through these semantics in the past. >>>>>>>>>>>>> > >>>>>>>>>>>>> > First - about the problem of duplicates: >>>>>>>>>>>>> > >>>>>>>>>>>>> > A "deterministic" sharding - e.g. hashCode based (though >>>>>>>>>>>>> Java makes no guarantee that hashCode is stable across JVM >>>>>>>>>>>>> instances, so >>>>>>>>>>>>> this technique ends up not being stable) doesn't really help >>>>>>>>>>>>> matters in >>>>>>>>>>>>> Beam. Unlike other systems, Beam makes no assumptions that >>>>>>>>>>>>> transforms are >>>>>>>>>>>>> idempotent or deterministic. What's more, _any_ transform that >>>>>>>>>>>>> has any sort >>>>>>>>>>>>> of triggered grouping (whether the trigger used is watermark >>>>>>>>>>>>> based or >>>>>>>>>>>>> otherwise) is non deterministic. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Forcing a hash of every element imposed quite a CPU cost; >>>>>>>>>>>>> even generating a random number per-element slowed things down >>>>>>>>>>>>> too much, >>>>>>>>>>>>> which is why the current code generates a random number only in >>>>>>>>>>>>> startBundle. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Any runner that does not implement RequiresStableInput will >>>>>>>>>>>>> not properly execute FileIO. Dataflow and Flink both support >>>>>>>>>>>>> this. I >>>>>>>>>>>>> believe that the Spark runner implicitly supports it by manually >>>>>>>>>>>>> calling >>>>>>>>>>>>> checkpoint as Ken mentioned (unless someone removed that from the >>>>>>>>>>>>> Spark >>>>>>>>>>>>> runner, but if so that would be a correctness regression). >>>>>>>>>>>>> Implementing >>>>>>>>>>>>> this has nothing to do with external shuffle services - neither >>>>>>>>>>>>> Flink, >>>>>>>>>>>>> Spark, nor Dataflow appliance (classic shuffle) have any problem >>>>>>>>>>>>> correctly >>>>>>>>>>>>> implementing RequiresStableInput. >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský < >>>>>>>>>>>>> je...@seznam.cz> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > I think that the support for @RequiresStableInput is rather >>>>>>>>>>>>> limited. AFAIK it is supported by streaming Flink (where it is >>>>>>>>>>>>> not needed >>>>>>>>>>>>> in this situation) and by Dataflow. Batch runners without >>>>>>>>>>>>> external shuffle >>>>>>>>>>>>> service that works as in the case of Dataflow have IMO no way to >>>>>>>>>>>>> implement >>>>>>>>>>>>> it correctly. In the case of FileIO (which do not use >>>>>>>>>>>>> @RequiresStableInput >>>>>>>>>>>>> as it would not be supported on Spark) the randomness is easily >>>>>>>>>>>>> avoidable >>>>>>>>>>>>> (hashCode of key?) and I it seems to me it should be preferred. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Jan >>>>>>>>>>>>> > >>>>>>>>>>>>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský < >>>>>>>>>>>>> je...@seznam.cz> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > Hi, >>>>>>>>>>>>> > >>>>>>>>>>>>> > maybe a little unrelated, but I think we definitely should >>>>>>>>>>>>> not use random assignment of shard keys (RandomShardingFunction), >>>>>>>>>>>>> at least >>>>>>>>>>>>> for bounded workloads (seems to be fine for streaming workloads). >>>>>>>>>>>>> Many >>>>>>>>>>>>> batch runners simply recompute path in the computation DAG from >>>>>>>>>>>>> the failed >>>>>>>>>>>>> node (transform) to the root (source). In the case there is any >>>>>>>>>>>>> non-determinism involved in the logic, then it can result in >>>>>>>>>>>>> duplicates (as >>>>>>>>>>>>> the 'previous' attempt might have ended in DAG path that was not >>>>>>>>>>>>> affected >>>>>>>>>>>>> by the fail). That addresses the option 2) of what Jozef have >>>>>>>>>>>>> mentioned. >>>>>>>>>>>>> > >>>>>>>>>>>>> > This is the reason we introduced "@RequiresStableInput". >>>>>>>>>>>>> > >>>>>>>>>>>>> > When things were only Dataflow, we knew that each shuffle >>>>>>>>>>>>> was a checkpoint, so we inserted a Reshuffle after the random >>>>>>>>>>>>> numbers were >>>>>>>>>>>>> generated, freezing them so it was safe for replay. Since other >>>>>>>>>>>>> engines do >>>>>>>>>>>>> not checkpoint at every shuffle, we needed a way to communicate >>>>>>>>>>>>> that this >>>>>>>>>>>>> checkpointing was required for correctness. I think we still have >>>>>>>>>>>>> many IOs >>>>>>>>>>>>> that are written using Reshuffle instead of @RequiresStableInput, >>>>>>>>>>>>> and I >>>>>>>>>>>>> don't know which runners process @RequiresStableInput properly. >>>>>>>>>>>>> > >>>>>>>>>>>>> > By the way, I believe the SparkRunner explicitly calls >>>>>>>>>>>>> materialize() after a GBK specifically so that it gets correct >>>>>>>>>>>>> results for >>>>>>>>>>>>> IOs that rely on Reshuffle. Has that changed? >>>>>>>>>>>>> > >>>>>>>>>>>>> > I agree that we should minimize use of RequiresStableInput. >>>>>>>>>>>>> It has a significant cost, and the cost varies across runners. If >>>>>>>>>>>>> we can >>>>>>>>>>>>> use a deterministic function, we should. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Kenn >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > Jan >>>>>>>>>>>>> > >>>>>>>>>>>>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles < >>>>>>>>>>>>> k...@apache.org> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > In general, Beam only deals with keys and grouping by key. I >>>>>>>>>>>>> think expanding this idea to some more abstract notion of a >>>>>>>>>>>>> sharding >>>>>>>>>>>>> function could make sense. >>>>>>>>>>>>> > >>>>>>>>>>>>> > For FileIO specifically, I wonder if you can use >>>>>>>>>>>>> writeDynamic() to get the behavior you are seeking. >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > The change in mind looks like this: >>>>>>>>>>>>> > >>>>>>>>>>>>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18 >>>>>>>>>>>>> > >>>>>>>>>>>>> > Dynamic Destinations in my mind is more towards the need for >>>>>>>>>>>>> "partitioning" data (destination as directory level) or if one >>>>>>>>>>>>> needs to >>>>>>>>>>>>> handle groups of events differently, e.g. write some events in >>>>>>>>>>>>> FormatA and >>>>>>>>>>>>> others in FormatB. >>>>>>>>>>>>> > Shards are now used for distributing writes or bucketing of >>>>>>>>>>>>> events within a particular destination group. More specifically, >>>>>>>>>>>>> currently, >>>>>>>>>>>>> each element is assigned `ShardedKey<Integer>` [1] before GBK >>>>>>>>>>>>> operation. >>>>>>>>>>>>> Sharded key is a compound of destination and assigned shard. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Having said that, I might be able to use dynamic destination >>>>>>>>>>>>> for this, possibly with the need of custom FileNaming, and set >>>>>>>>>>>>> shards to be >>>>>>>>>>>>> always 1. But it feels less natural than allowing the user to >>>>>>>>>>>>> swap already >>>>>>>>>>>>> present `RandomShardingFunction` [2] with something of his own >>>>>>>>>>>>> choosing. >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > [1] >>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java >>>>>>>>>>>>> > [2] >>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856 >>>>>>>>>>>>> > >>>>>>>>>>>>> > Kenn >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton < >>>>>>>>>>>>> tyso...@google.com> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > Adding sharding to the model may require a wider discussion >>>>>>>>>>>>> than FileIO alone. I'm not entirely sure how wide, or if this has >>>>>>>>>>>>> been >>>>>>>>>>>>> proposed before, but IMO it warrants a design doc or proposal. >>>>>>>>>>>>> > >>>>>>>>>>>>> > A couple high level questions I can think of are, >>>>>>>>>>>>> > - What runners support sharding? >>>>>>>>>>>>> > * There will be some work in Dataflow required to >>>>>>>>>>>>> support this but I'm not sure how much. >>>>>>>>>>>>> > - What does sharding mean for streaming pipelines? >>>>>>>>>>>>> > >>>>>>>>>>>>> > A more nitty-detail question: >>>>>>>>>>>>> > - How can this be achieved performantly? For example, if a >>>>>>>>>>>>> shuffle is required to achieve a particular sharding constraint, >>>>>>>>>>>>> should we >>>>>>>>>>>>> allow transforms to declare they don't modify the sharding >>>>>>>>>>>>> property (e.g. >>>>>>>>>>>>> key preserving) which may allow a runner to avoid an additional >>>>>>>>>>>>> shuffle if >>>>>>>>>>>>> a preceding shuffle can guarantee the sharding requirements? >>>>>>>>>>>>> > >>>>>>>>>>>>> > Where X is the shuffle that could be avoided: input -> >>>>>>>>>>>>> shuffle (key sharding fn A) -> transform1 (key preserving) -> >>>>>>>>>>>>> transform 2 >>>>>>>>>>>>> (key preserving) -> X -> fileio (key sharding fn A) >>>>>>>>>>>>> > >>>>>>>>>>>>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek < >>>>>>>>>>>>> jozo.vil...@gmail.com> wrote: >>>>>>>>>>>>> > >>>>>>>>>>>>> > I would like to extend FileIO with possibility to specify a >>>>>>>>>>>>> custom sharding function: >>>>>>>>>>>>> > https://issues.apache.org/jira/browse/BEAM-12493 >>>>>>>>>>>>> > >>>>>>>>>>>>> > I have 2 use-cases for this: >>>>>>>>>>>>> > >>>>>>>>>>>>> > I need to generate shards which are compatible with Hive >>>>>>>>>>>>> bucketing and therefore need to decide shard assignment based on >>>>>>>>>>>>> data >>>>>>>>>>>>> fields of input element >>>>>>>>>>>>> > When running e.g. on Spark and job encounters kind of >>>>>>>>>>>>> failure which cause a loss of some data from previous stages, >>>>>>>>>>>>> Spark does >>>>>>>>>>>>> issue recompute of necessary task in necessary stages to recover >>>>>>>>>>>>> data. >>>>>>>>>>>>> Because the shard assignment function is random as default, some >>>>>>>>>>>>> data will >>>>>>>>>>>>> end up in different shards and cause duplicates in the final >>>>>>>>>>>>> output. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Please let me know your thoughts in case you see a reason to >>>>>>>>>>>>> not to add such improvement. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Thanks, >>>>>>>>>>>>> > Jozef >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> >>>>>>>>>>>>