On Sat, Jul 3, 2021 at 12:55 PM Jan Lukavský <je...@seznam.cz> wrote:
> > I don't think this has anything to do with external shuffle services. > > > Sorry, for stepping into this discussion again, but I don't think this > statement is 100% correct. What Spark's checkpoint does is that it saves > intermediate data (prior to shuffle) to external storage so that it can be > made available in case it is needed? Now, what is the purpose of an > external shuffle service? Persist data externally to make it available when > it is needed. It that sense enforcing checkpoint prior to each shuffle > effectively means, we force Spark to shuffle data twice (ok, > once-and-a-half-times, the external checkpoint is likely not be read) - > once using its own internal shuffle and second time the external > checkpoint. That is not going to be efficient. And if spark could switch > off its internal shuffle and use the external checkpoint for the same > purpose, then the checkpoint would play role of external shuffle service, > which is why this whole discussion has to do something with external > shuffle services. > > To move this discussion forward I think that the solution could be in that > SparkRunner can override the default FileIO transform and use a > deterministic sharding function. @Jozef, would that work for you? It would > mean, that the same should probably have to be done (sooner or later) for > Flink batch and probably for other runners. Maybe there could be a > deterministic override ready-made in runners-core. Or, actually, maybe the > more easy way would be the other way around, that Dataflow would use the > non-deterministic version, while other runners could use the (more > conservative, yet not that performant) version. > > WDYT? > Yes, that could work for the problem with Beam on Spark producing inconsistent file output when shuffle data is lost. I would still need to address the "bucketing problem" e.g. wanting data from the same user_id and hour end up in the same file. It feels ok to tackle those separately. > Jan > On 7/2/21 8:23 PM, Reuven Lax wrote: > > > > On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> >> How will @RequiresStableInput prevent this situation when running batch >>> use case? >>> >> >> So this is handled in combination of @RequiresStableInput and output file >> finalization. @RequiresStableInput (or Reshuffle for most runners) makes >> sure that the input provided for the write stage does not get recomputed in >> the presence of failures. Output finalization makes sure that we only >> finalize one run of each bundle and discard the rest. >> >> So this I believe relies on having robust external service to hold >> shuffle data and serve it out when needed so pipeline does not need to >> recompute it via non-deterministic function. In Spark however, shuffle >> service can not be (if I am not mistaking) deployed in this fashion (HA + >> replicated shuffle data). Therefore, if instance of shuffle service holding >> a portion of shuffle data fails, spark recovers it by recomputing parts of >> a DAG from source to recover lost shuffle results. I am not sure what Beam >> can do here to prevent it or make it stable? Will @RequiresStableInput work >> as expected? ... Note that this, I believe, concerns only the batch. >> > > I don't think this has anything to do with external shuffle services. > > Arbitrarily recomputing data is fundamentally incompatible with Beam, > since Beam does not restrict transforms to being deterministic. The Spark > runner works (at least it did last I checked) by checkpointing the RDD. > Spark will not recompute the DAG past a checkpoint, so this creates stable > input to a transform. This adds some cost to the Spark runner, but makes > things correct. You should not have sharding problems due to replay unless > there is a bug in the current Spark runner. > > >> >> Also, when withNumShards() truly have to be used, round robin assignment >> of elements to shards sounds like the optimal solution (at least for >> the vast majority of pipelines) >> >> I agree. But random seed is my problem here with respect to the situation >> mentioned above. >> >> Right, I would like to know if there are more true use-cases before >> adding this. This essentially allows users to map elements to exact output >> shards which could change the characteristics of pipelines in very >> significant ways without users being aware of it. For example, this could >> result in extremely imbalanced workloads for any downstream processors. If >> it's just Hive I would rather work around it (even with a perf penalty for >> that case). >> >> User would have to explicitly ask FileIO to use specific sharding. >> Documentation can educate about the tradeoffs. But I am open to workaround >> alternatives. >> >> >> On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> >>> >>> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> Hi Cham, thanks for the feedback >>>> >>>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow >>>> runners to optimize better. I think one concern might be that the addition >>>> of this option might be going against this. >>>> >>>> I agree that less knobs is more. But if assignment of a key is specific >>>> per user request via API (optional), than runner should not optimize that >>>> but need to respect how data is requested to be laid down. It could however >>>> optimize number of shards as it is doing right now if numShards is not set >>>> explicitly. >>>> Anyway FileIO feels fluid here. You can leave shards empty and let >>>> runner decide or optimize, but only in batch and not in streaming. Then >>>> it allows you to set number of shards but never let you change the logic >>>> how it is assigned and defaults to round robin with random seed. It begs >>>> question for me why I can manipulate number of shards but not the logic of >>>> assignment. Are there plans to (or already case implemented) optimize >>>> around and have runner changing that under different circumstances? >>>> >>> >>> I don't know the exact history behind withNumShards() feature for batch. >>> I would guess it was originally introduced due to limitations for streaming >>> but was made available to batch as well with a strong performance warning. >>> >>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L159 >>> >>> Also, when withNumShards() truly have to be used, round robin assignment >>> of elements to shards sounds like the optimal solution (at least for >>> the vast majority of pipelines) >>> >>> >>>> >>>> > Also looking at your original reasoning for adding this. >>>> >>>> > > I need to generate shards which are compatible with Hive bucketing >>>> and therefore need to decide shard assignment based on data fields of input >>>> element >>>> >>>> > This sounds very specific to Hive. Again I think the downside here is >>>> that we will be giving up the flexibility for the runner to optimize the >>>> pipeline and decide sharding. Do you think it's possible to somehow relax >>>> this requirement for > > > Hive or use a secondary process to format the >>>> output to a form that is suitable for Hive after being written to an >>>> intermediate storage. >>>> >>>> It is possible. But it is quite suboptimal because of extra IO penalty >>>> and I would have to use completely custom file writing as FileIO would not >>>> support this. Then naturally the question would be, should I use FileIO at >>>> all? I am quite not sure if I am doing something so rare that it is not and >>>> can not be supported. Maybe I am. I remember an old discussion from Scio >>>> guys when they wanted to introduce Sorted Merge Buckets to FileIO where >>>> sharding was also needed to be manipulated (among other things) >>>> >>> >>> Right, I would like to know if there are more true use-cases before >>> adding this. This essentially allows users to map elements to exact output >>> shards which could change the characteristics of pipelines in very >>> significant ways without users being aware of it. For example, this could >>> result in extremely imbalanced workloads for any downstream processors. If >>> it's just Hive I would rather work around it (even with a perf penalty for >>> that case). >>> >>> >>>> >>>> > > When running e.g. on Spark and job encounters kind of failure which >>>> cause a loss of some data from previous stages, Spark does issue recompute >>>> of necessary task in necessary stages to recover data. Because the shard >>>> assignment function is random as default, some data will end up in >>>> different shards and cause duplicates in the final output. >>>> >>>> > I think this was already addressed. The correct solution here is to >>>> implement RequiresStableInput for runners that do not already support that >>>> and update FileIO to use that. >>>> >>>> How will @RequiresStableInput prevent this situation when running batch >>>> use case? >>>> >>> >>> So this is handled in combination of @RequiresStableInput and output >>> file finalization. @RequiresStableInput (or Reshuffle for most runners) >>> makes sure that the input provided for the write stage does not get >>> recomputed in the presence of failures. Output finalization makes sure that >>> we only finalize one run of each bundle and discard the rest. >>> >>> Thanks, >>> Cham >>> >>> >>>> >>>> >>>> On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> >>>>> >>>>> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek <jozo.vil...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> how do we proceed with reviewing MR proposed for this change? >>>>>> >>>>>> I sense there is a concern exposing existing sharding function to the >>>>>> API. But from the discussion here I do not have a clear picture >>>>>> about arguments not doing so. >>>>>> Only one argument was that dynamic destinations should be able to do >>>>>> the same. While this is true, as it is illustrated in previous commnet, >>>>>> it >>>>>> is not simple nor convenient to use and requires more customization than >>>>>> exposing sharding which is already there. >>>>>> Are there more negatives to exposing sharding function? >>>>>> >>>>> >>>>> Seems like currently FlinkStreamingPipelineTranslator is the only real >>>>> usage of WriteFiles.withShardingFunction() : >>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L281 >>>>> WriteFiles is not expected to be used by pipeline authors but exposing >>>>> this through the FileIO will make this available to pipeline authors (and >>>>> some may choose to do so for various reasons). >>>>> This will result in sharding being strict and runners being inflexible >>>>> when it comes to optimizing execution of pipelines that use FileIO. >>>>> >>>>> Beam has a policy of no knobs (or keeping knobs minimum) to allow >>>>> runners to optimize better. I think one concern might be that the addition >>>>> of this option might be going against this. >>>>> >>>>> Also looking at your original reasoning for adding this. >>>>> >>>>> > I need to generate shards which are compatible with Hive bucketing >>>>> and therefore need to decide shard assignment based on data fields of >>>>> input >>>>> element >>>>> >>>>> This sounds very specific to Hive. Again I think the downside here is >>>>> that we will be giving up the flexibility for the runner to optimize the >>>>> pipeline and decide sharding. Do you think it's possible to somehow relax >>>>> this requirement for Hive or use a secondary process to format the output >>>>> to a form that is suitable for Hive after being written to an intermediate >>>>> storage. >>>>> >>>>> > When running e.g. on Spark and job encounters kind of failure which >>>>> cause a loss of some data from previous stages, Spark does issue recompute >>>>> of necessary task in necessary stages to recover data. Because the shard >>>>> assignment function is random as default, some data will end up in >>>>> different shards and cause duplicates in the final output. >>>>> >>>>> I think this was already addressed. The correct solution here is to >>>>> implement RequiresStableInput for runners that do not already support that >>>>> and update FileIO to use that. >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> >>>>>> >>>>>> On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> The difference in my opinion is in distinguishing between - as >>>>>>> written in this thread - physical vs logical properties of the >>>>>>> pipeline. I >>>>>>> proposed to keep dynamic destination (logical) and sharding (physical) >>>>>>> separate on API level as it is at implementation level. >>>>>>> >>>>>>> When I reason about using `by()` for my case ... I am using dynamic >>>>>>> destination to partition data into hourly folders. So my destination is >>>>>>> e.g. `2021-06-23-07`. To add a shard there I assume I will need >>>>>>> * encode shard to the destination .. e.g. in form of file prefix >>>>>>> `2021-06-23-07/shard-1` >>>>>>> * dynamic destination now does not span a group of files but must be >>>>>>> exactly one file >>>>>>> * to respect above, I have to. "disable" sharding in WriteFiles and >>>>>>> make sure to use `.withNumShards(1)` ... I am not sure what `runnner >>>>>>> determined` sharding would do, if it would not split destination further >>>>>>> into more files >>>>>>> * make sure that FileNameing will respect my intent and name files >>>>>>> as I expect based on that destination >>>>>>> * post process `WriteFilesResult` to turn my destination which >>>>>>> targets physical single file (date-with-hour + shard-num) back into the >>>>>>> destination with targets logical group of files (date-with-hour) so I >>>>>>> hook >>>>>>> it up do downstream post-process as usual >>>>>>> >>>>>>> Am I roughly correct? Or do I miss something more straight forward? >>>>>>> If the above is correct then it feels more fragile and less >>>>>>> intuitive to me than the option in my MR. >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> I'm not sure I understand your PR. How is this PR different than >>>>>>>> the by() method in FileIO? >>>>>>>> >>>>>>>> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> MR for review for this change is here: >>>>>>>>> https://github.com/apache/beam/pull/15051 >>>>>>>>> >>>>>>>>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek < >>>>>>>>> jozo.vil...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I would like this thread to stay focused on sharding FileIO only. >>>>>>>>>> Possible change to the model is an interesting topic but of a much >>>>>>>>>> different scope. >>>>>>>>>> >>>>>>>>>> Yes, I agree that sharding is mostly a physical rather than >>>>>>>>>> logical property of the pipeline. That is why it feels more natural >>>>>>>>>> to >>>>>>>>>> distinguish between those two on the API level. >>>>>>>>>> As for handling sharding requirements by adding more sugar to >>>>>>>>>> dynamic destinations + file naming one has to keep in mind that >>>>>>>>>> results of >>>>>>>>>> dynamic writes can be observed in the form of KV<DestinationT, >>>>>>>>>> String>, >>>>>>>>>> so written files per dynamic destination. Often we do GBP to >>>>>>>>>> post-process >>>>>>>>>> files per destination / logical group. If sharding would be encoded >>>>>>>>>> there, >>>>>>>>>> then it might complicate things either downstream or inside the >>>>>>>>>> sugar part >>>>>>>>>> to put shard in and then take it out later. >>>>>>>>>> From the user perspective I do not see much difference. We would >>>>>>>>>> still need to allow API to define both behaviors and it would only be >>>>>>>>>> executed differently by implementation. >>>>>>>>>> I do not see a value in changing FileIO (WriteFiles) logic to >>>>>>>>>> stop using sharding and use dynamic destination for both given that >>>>>>>>>> sharding function is already there and in use. >>>>>>>>>> >>>>>>>>>> To the point of external shuffle and non-deterministic user >>>>>>>>>> input. >>>>>>>>>> Yes users can create non-deterministic behaviors but they are in >>>>>>>>>> control. Here, Beam internally adds non-deterministic behavior and >>>>>>>>>> users >>>>>>>>>> can not opt-out. >>>>>>>>>> All works fine as long as external shuffle service (depends on >>>>>>>>>> Runner) holds to the data and hands it out on retries. However if >>>>>>>>>> data in >>>>>>>>>> shuffle service is lost for some reason - e.g. disk failure, node >>>>>>>>>> breaks >>>>>>>>>> down - then pipeline have to recover the data by recomputing >>>>>>>>>> necessary >>>>>>>>>> paths. >>>>>>>>>> >>>>>>>>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw < >>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> Sharding is typically a physical rather than logical property of >>>>>>>>>>> the >>>>>>>>>>> pipeline, and I'm not convinced it makes sense to add it to Beam >>>>>>>>>>> in >>>>>>>>>>> general. One can already use keys and GBK/Stateful DoFns if some >>>>>>>>>>> kind >>>>>>>>>>> of logical grouping is needed, and adding constraints like this >>>>>>>>>>> can >>>>>>>>>>> prevent opportunities for optimizations (like dynamic sharding >>>>>>>>>>> and >>>>>>>>>>> fusion). >>>>>>>>>>> >>>>>>>>>>> That being said, file output are one area where it could make >>>>>>>>>>> sense. I >>>>>>>>>>> would expect that dynamic destinations could cover this usecase, >>>>>>>>>>> and a >>>>>>>>>>> general FileNaming subclass could be provided to make this >>>>>>>>>>> pattern >>>>>>>>>>> easier (and possibly some syntactic sugar for auto-setting num >>>>>>>>>>> shards >>>>>>>>>>> to 0). (One downside of this approach is that one couldn't do >>>>>>>>>>> dynamic >>>>>>>>>>> destinations, and have each sharded with a distinct sharing >>>>>>>>>>> function >>>>>>>>>>> as well.) >>>>>>>>>>> >>>>>>>>>>> If this doesn't work, we could look into adding ShardingFunction >>>>>>>>>>> as a >>>>>>>>>>> publicly exposed parameter to FileIO. (I'm actually surprised it >>>>>>>>>>> already exists.) >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 17, 2021 at 9:39 AM <je...@seznam.cz> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Alright, but what is worth emphasizing is that we talk about >>>>>>>>>>> batch workloads. The typical scenario is that the output is >>>>>>>>>>> committed once >>>>>>>>>>> the job finishes (e.g., by atomic rename of directory). >>>>>>>>>>> > Jan >>>>>>>>>>> > >>>>>>>>>>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax < >>>>>>>>>>> re...@google.com>: >>>>>>>>>>> > >>>>>>>>>>> > Yes - the problem is that Beam makes no guarantees of >>>>>>>>>>> determinism anywhere in the system. User DoFns might be non >>>>>>>>>>> deterministic, >>>>>>>>>>> and have no way to know (we've discussed proving users with an >>>>>>>>>>> @IsDeterministic annotation, however empirically users often think >>>>>>>>>>> their >>>>>>>>>>> functions are deterministic when they are in fact not). _Any_ sort >>>>>>>>>>> of >>>>>>>>>>> triggered aggregation (including watermark based) can always be non >>>>>>>>>>> deterministic. >>>>>>>>>>> > >>>>>>>>>>> > Even if everything was deterministic, replaying everything is >>>>>>>>>>> tricky. The output files already exist - should the system delete >>>>>>>>>>> them and >>>>>>>>>>> recreate them, or leave them as is and delete the temp files? Either >>>>>>>>>>> decision could be problematic. >>>>>>>>>>> > >>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Correct, by the external shuffle service I pretty much meant >>>>>>>>>>> "offloading the contents of a shuffle phase out of the system". >>>>>>>>>>> Looks like >>>>>>>>>>> that is what the Spark's checkpoint does as well. On the other hand >>>>>>>>>>> (if I >>>>>>>>>>> understand the concept correctly), that implies some performance >>>>>>>>>>> penalty - >>>>>>>>>>> the data has to be moved to external distributed filesystem. Which >>>>>>>>>>> then >>>>>>>>>>> feels weird if we optimize code to avoid computing random numbers, >>>>>>>>>>> but are >>>>>>>>>>> okay with moving complete datasets back and forth. I think in this >>>>>>>>>>> particular case making the Pipeline deterministic - idempotent to be >>>>>>>>>>> precise - (within the limits, yes, hashCode of enum is not stable >>>>>>>>>>> between >>>>>>>>>>> JVMs) would seem more practical to me. >>>>>>>>>>> > >>>>>>>>>>> > Jan >>>>>>>>>>> > >>>>>>>>>>> > On 6/17/21 7:09 AM, Reuven Lax wrote: >>>>>>>>>>> > >>>>>>>>>>> > I have some thoughts here, as Eugene Kirpichov and I spent a >>>>>>>>>>> lot of time working through these semantics in the past. >>>>>>>>>>> > >>>>>>>>>>> > First - about the problem of duplicates: >>>>>>>>>>> > >>>>>>>>>>> > A "deterministic" sharding - e.g. hashCode based (though Java >>>>>>>>>>> makes no guarantee that hashCode is stable across JVM instances, so >>>>>>>>>>> this >>>>>>>>>>> technique ends up not being stable) doesn't really help matters in >>>>>>>>>>> Beam. >>>>>>>>>>> Unlike other systems, Beam makes no assumptions that transforms are >>>>>>>>>>> idempotent or deterministic. What's more, _any_ transform that has >>>>>>>>>>> any sort >>>>>>>>>>> of triggered grouping (whether the trigger used is watermark based >>>>>>>>>>> or >>>>>>>>>>> otherwise) is non deterministic. >>>>>>>>>>> > >>>>>>>>>>> > Forcing a hash of every element imposed quite a CPU cost; even >>>>>>>>>>> generating a random number per-element slowed things down too much, >>>>>>>>>>> which >>>>>>>>>>> is why the current code generates a random number only in >>>>>>>>>>> startBundle. >>>>>>>>>>> > >>>>>>>>>>> > Any runner that does not implement RequiresStableInput will >>>>>>>>>>> not properly execute FileIO. Dataflow and Flink both support this. I >>>>>>>>>>> believe that the Spark runner implicitly supports it by manually >>>>>>>>>>> calling >>>>>>>>>>> checkpoint as Ken mentioned (unless someone removed that from the >>>>>>>>>>> Spark >>>>>>>>>>> runner, but if so that would be a correctness regression). >>>>>>>>>>> Implementing >>>>>>>>>>> this has nothing to do with external shuffle services - neither >>>>>>>>>>> Flink, >>>>>>>>>>> Spark, nor Dataflow appliance (classic shuffle) have any problem >>>>>>>>>>> correctly >>>>>>>>>>> implementing RequiresStableInput. >>>>>>>>>>> > >>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > I think that the support for @RequiresStableInput is rather >>>>>>>>>>> limited. AFAIK it is supported by streaming Flink (where it is not >>>>>>>>>>> needed >>>>>>>>>>> in this situation) and by Dataflow. Batch runners without external >>>>>>>>>>> shuffle >>>>>>>>>>> service that works as in the case of Dataflow have IMO no way to >>>>>>>>>>> implement >>>>>>>>>>> it correctly. In the case of FileIO (which do not use >>>>>>>>>>> @RequiresStableInput >>>>>>>>>>> as it would not be supported on Spark) the randomness is easily >>>>>>>>>>> avoidable >>>>>>>>>>> (hashCode of key?) and I it seems to me it should be preferred. >>>>>>>>>>> > >>>>>>>>>>> > Jan >>>>>>>>>>> > >>>>>>>>>>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote: >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Hi, >>>>>>>>>>> > >>>>>>>>>>> > maybe a little unrelated, but I think we definitely should not >>>>>>>>>>> use random assignment of shard keys (RandomShardingFunction), at >>>>>>>>>>> least for >>>>>>>>>>> bounded workloads (seems to be fine for streaming workloads). Many >>>>>>>>>>> batch >>>>>>>>>>> runners simply recompute path in the computation DAG from the >>>>>>>>>>> failed node >>>>>>>>>>> (transform) to the root (source). In the case there is any >>>>>>>>>>> non-determinism >>>>>>>>>>> involved in the logic, then it can result in duplicates (as the >>>>>>>>>>> 'previous' >>>>>>>>>>> attempt might have ended in DAG path that was not affected by the >>>>>>>>>>> fail). >>>>>>>>>>> That addresses the option 2) of what Jozef have mentioned. >>>>>>>>>>> > >>>>>>>>>>> > This is the reason we introduced "@RequiresStableInput". >>>>>>>>>>> > >>>>>>>>>>> > When things were only Dataflow, we knew that each shuffle was >>>>>>>>>>> a checkpoint, so we inserted a Reshuffle after the random numbers >>>>>>>>>>> were >>>>>>>>>>> generated, freezing them so it was safe for replay. Since other >>>>>>>>>>> engines do >>>>>>>>>>> not checkpoint at every shuffle, we needed a way to communicate >>>>>>>>>>> that this >>>>>>>>>>> checkpointing was required for correctness. I think we still have >>>>>>>>>>> many IOs >>>>>>>>>>> that are written using Reshuffle instead of @RequiresStableInput, >>>>>>>>>>> and I >>>>>>>>>>> don't know which runners process @RequiresStableInput properly. >>>>>>>>>>> > >>>>>>>>>>> > By the way, I believe the SparkRunner explicitly calls >>>>>>>>>>> materialize() after a GBK specifically so that it gets correct >>>>>>>>>>> results for >>>>>>>>>>> IOs that rely on Reshuffle. Has that changed? >>>>>>>>>>> > >>>>>>>>>>> > I agree that we should minimize use of RequiresStableInput. It >>>>>>>>>>> has a significant cost, and the cost varies across runners. If we >>>>>>>>>>> can use a >>>>>>>>>>> deterministic function, we should. >>>>>>>>>>> > >>>>>>>>>>> > Kenn >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > Jan >>>>>>>>>>> > >>>>>>>>>>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote: >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles < >>>>>>>>>>> k...@apache.org> wrote: >>>>>>>>>>> > >>>>>>>>>>> > In general, Beam only deals with keys and grouping by key. I >>>>>>>>>>> think expanding this idea to some more abstract notion of a sharding >>>>>>>>>>> function could make sense. >>>>>>>>>>> > >>>>>>>>>>> > For FileIO specifically, I wonder if you can use >>>>>>>>>>> writeDynamic() to get the behavior you are seeking. >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > The change in mind looks like this: >>>>>>>>>>> > >>>>>>>>>>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18 >>>>>>>>>>> > >>>>>>>>>>> > Dynamic Destinations in my mind is more towards the need for >>>>>>>>>>> "partitioning" data (destination as directory level) or if one >>>>>>>>>>> needs to >>>>>>>>>>> handle groups of events differently, e.g. write some events in >>>>>>>>>>> FormatA and >>>>>>>>>>> others in FormatB. >>>>>>>>>>> > Shards are now used for distributing writes or bucketing of >>>>>>>>>>> events within a particular destination group. More specifically, >>>>>>>>>>> currently, >>>>>>>>>>> each element is assigned `ShardedKey<Integer>` [1] before GBK >>>>>>>>>>> operation. >>>>>>>>>>> Sharded key is a compound of destination and assigned shard. >>>>>>>>>>> > >>>>>>>>>>> > Having said that, I might be able to use dynamic destination >>>>>>>>>>> for this, possibly with the need of custom FileNaming, and set >>>>>>>>>>> shards to be >>>>>>>>>>> always 1. But it feels less natural than allowing the user to swap >>>>>>>>>>> already >>>>>>>>>>> present `RandomShardingFunction` [2] with something of his own >>>>>>>>>>> choosing. >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > [1] >>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java >>>>>>>>>>> > [2] >>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856 >>>>>>>>>>> > >>>>>>>>>>> > Kenn >>>>>>>>>>> > >>>>>>>>>>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton < >>>>>>>>>>> tyso...@google.com> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Adding sharding to the model may require a wider discussion >>>>>>>>>>> than FileIO alone. I'm not entirely sure how wide, or if this has >>>>>>>>>>> been >>>>>>>>>>> proposed before, but IMO it warrants a design doc or proposal. >>>>>>>>>>> > >>>>>>>>>>> > A couple high level questions I can think of are, >>>>>>>>>>> > - What runners support sharding? >>>>>>>>>>> > * There will be some work in Dataflow required to >>>>>>>>>>> support this but I'm not sure how much. >>>>>>>>>>> > - What does sharding mean for streaming pipelines? >>>>>>>>>>> > >>>>>>>>>>> > A more nitty-detail question: >>>>>>>>>>> > - How can this be achieved performantly? For example, if a >>>>>>>>>>> shuffle is required to achieve a particular sharding constraint, >>>>>>>>>>> should we >>>>>>>>>>> allow transforms to declare they don't modify the sharding property >>>>>>>>>>> (e.g. >>>>>>>>>>> key preserving) which may allow a runner to avoid an additional >>>>>>>>>>> shuffle if >>>>>>>>>>> a preceding shuffle can guarantee the sharding requirements? >>>>>>>>>>> > >>>>>>>>>>> > Where X is the shuffle that could be avoided: input -> shuffle >>>>>>>>>>> (key sharding fn A) -> transform1 (key preserving) -> transform 2 >>>>>>>>>>> (key >>>>>>>>>>> preserving) -> X -> fileio (key sharding fn A) >>>>>>>>>>> > >>>>>>>>>>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek < >>>>>>>>>>> jozo.vil...@gmail.com> wrote: >>>>>>>>>>> > >>>>>>>>>>> > I would like to extend FileIO with possibility to specify a >>>>>>>>>>> custom sharding function: >>>>>>>>>>> > https://issues.apache.org/jira/browse/BEAM-12493 >>>>>>>>>>> > >>>>>>>>>>> > I have 2 use-cases for this: >>>>>>>>>>> > >>>>>>>>>>> > I need to generate shards which are compatible with Hive >>>>>>>>>>> bucketing and therefore need to decide shard assignment based on >>>>>>>>>>> data >>>>>>>>>>> fields of input element >>>>>>>>>>> > When running e.g. on Spark and job encounters kind of failure >>>>>>>>>>> which cause a loss of some data from previous stages, Spark does >>>>>>>>>>> issue >>>>>>>>>>> recompute of necessary task in necessary stages to recover data. >>>>>>>>>>> Because >>>>>>>>>>> the shard assignment function is random as default, some data will >>>>>>>>>>> end up >>>>>>>>>>> in different shards and cause duplicates in the final output. >>>>>>>>>>> > >>>>>>>>>>> > Please let me know your thoughts in case you see a reason to >>>>>>>>>>> not to add such improvement. >>>>>>>>>>> > >>>>>>>>>>> > Thanks, >>>>>>>>>>> > Jozef >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>>