On Sat, Jul 3, 2021 at 12:55 PM Jan Lukavský <je...@seznam.cz> wrote:

>
> I don't think this has anything to do with external shuffle services.
>
>
> Sorry, for stepping into this discussion again, but I don't think this
> statement is 100% correct. What Spark's checkpoint does is that it saves
> intermediate data (prior to shuffle) to external storage so that it can be
> made available in case it is needed? Now, what is the purpose of an
> external shuffle service? Persist data externally to make it available when
> it is needed. It that sense enforcing checkpoint prior to each shuffle
> effectively means, we force Spark to shuffle data twice (ok,
> once-and-a-half-times, the external checkpoint is likely not be read) -
> once using its own internal shuffle and second time the external
> checkpoint. That is not going to be efficient. And if spark could switch
> off its internal shuffle and use the external checkpoint for the same
> purpose, then the checkpoint would play role of external shuffle service,
> which is why this whole discussion has to do something with external
> shuffle services.
>
> To move this discussion forward I think that the solution could be in that
> SparkRunner can override the default FileIO transform and use a
> deterministic sharding function. @Jozef, would that work for you? It would
> mean, that the same should probably have to be done (sooner or later) for
> Flink batch and probably for other runners. Maybe there could be a
> deterministic override ready-made in runners-core. Or, actually, maybe the
> more easy way would be the other way around, that Dataflow would use the
> non-deterministic version, while other runners could use the (more
> conservative, yet not that performant) version.
>
> WDYT?
>
Yes, that could work for the problem with Beam on Spark producing
inconsistent file output when shuffle data is lost. I would still need to
address the "bucketing problem" e.g. wanting data from the same user_id and
hour end up in the same file. It feels ok to tackle those separately.


>  Jan
> On 7/2/21 8:23 PM, Reuven Lax wrote:
>
>
>
> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>>
>> How will @RequiresStableInput prevent this situation when running batch
>>> use case?
>>>
>>
>> So this is handled in combination of @RequiresStableInput and output file
>> finalization. @RequiresStableInput (or Reshuffle for most runners) makes
>> sure that the input provided for the write stage does not get recomputed in
>> the presence of failures. Output finalization makes sure that we only
>> finalize one run of each bundle and discard the rest.
>>
>> So this I believe relies on having robust external service to hold
>> shuffle data and serve it out when needed so pipeline does not need to
>> recompute it via non-deterministic function. In Spark however, shuffle
>> service can not be (if I am not mistaking) deployed in this fashion (HA +
>> replicated shuffle data). Therefore, if instance of shuffle service holding
>> a portion of shuffle data fails, spark recovers it by recomputing parts of
>> a DAG from source to recover lost shuffle results. I am not sure what Beam
>> can do here to prevent it or make it stable? Will @RequiresStableInput work
>> as expected? ... Note that this, I believe, concerns only the batch.
>>
>
> I don't think this has anything to do with external shuffle services.
>
> Arbitrarily recomputing data is fundamentally incompatible with Beam,
> since Beam does not restrict transforms to being deterministic. The Spark
> runner works (at least it did last I checked) by checkpointing the RDD.
> Spark will not recompute the DAG past a checkpoint, so this creates stable
> input to a transform. This adds some cost to the Spark runner, but makes
> things correct. You should not have sharding problems due to replay unless
> there is a bug in the current Spark runner.
>
>
>>
>> Also, when withNumShards() truly have to be used, round robin assignment
>> of elements to shards sounds like the optimal solution (at least for
>> the vast majority of pipelines)
>>
>> I agree. But random seed is my problem here with respect to the situation
>> mentioned above.
>>
>> Right, I would like to know if there are more true use-cases before
>> adding this. This essentially allows users to map elements to exact output
>> shards which could change the characteristics of pipelines in very
>> significant ways without users being aware of it. For example, this could
>> result in extremely imbalanced workloads for any downstream processors. If
>> it's just Hive I would rather work around it (even with a perf penalty for
>> that case).
>>
>> User would have to explicitly ask FileIO to use specific sharding.
>> Documentation can educate about the tradeoffs. But I am open to workaround
>> alternatives.
>>
>>
>> On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cham, thanks for the feedback
>>>>
>>>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>> runners to optimize better. I think one concern might be that the addition
>>>> of this option might be going against this.
>>>>
>>>> I agree that less knobs is more. But if assignment of a key is specific
>>>> per user request via API (optional), than runner should not optimize that
>>>> but need to respect how data is requested to be laid down. It could however
>>>> optimize number of shards as it is doing right now if numShards is not set
>>>> explicitly.
>>>> Anyway FileIO feels fluid here. You can leave shards empty and let
>>>> runner decide or optimize,  but only in batch and not in streaming.  Then
>>>> it allows you to set number of shards but never let you change the logic
>>>> how it is assigned and defaults to round robin with random seed. It begs
>>>> question for me why I can manipulate number of shards but not the logic of
>>>> assignment. Are there plans to (or already case implemented) optimize
>>>> around and have runner changing that under different circumstances?
>>>>
>>>
>>> I don't know the exact history behind withNumShards() feature for batch.
>>> I would guess it was originally introduced due to limitations for streaming
>>> but was made available to batch as well with a strong performance warning.
>>>
>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L159
>>>
>>> Also, when withNumShards() truly have to be used, round robin assignment
>>> of elements to shards sounds like the optimal solution (at least for
>>> the vast majority of pipelines)
>>>
>>>
>>>>
>>>> > Also looking at your original reasoning for adding this.
>>>>
>>>> > > I need to generate shards which are compatible with Hive bucketing
>>>> and therefore need to decide shard assignment based on data fields of input
>>>> element
>>>>
>>>> > This sounds very specific to Hive. Again I think the downside here is
>>>> that we will be giving up the flexibility for the runner to optimize the
>>>> pipeline and decide sharding. Do you think it's possible to somehow relax
>>>> this requirement for > > > Hive or use a secondary process to format the
>>>> output to a form that is suitable for Hive after being written to an
>>>> intermediate storage.
>>>>
>>>> It is possible. But it is quite suboptimal because of extra IO penalty
>>>> and I would have to use completely custom file writing as FileIO would not
>>>> support this. Then naturally the question would be, should I use FileIO at
>>>> all? I am quite not sure if I am doing something so rare that it is not and
>>>> can not be supported. Maybe I am. I remember an old discussion from Scio
>>>> guys when they wanted to introduce Sorted Merge Buckets to FileIO where
>>>> sharding was also needed to be manipulated (among other things)
>>>>
>>>
>>> Right, I would like to know if there are more true use-cases before
>>> adding this. This essentially allows users to map elements to exact output
>>> shards which could change the characteristics of pipelines in very
>>> significant ways without users being aware of it. For example, this could
>>> result in extremely imbalanced workloads for any downstream processors. If
>>> it's just Hive I would rather work around it (even with a perf penalty for
>>> that case).
>>>
>>>
>>>>
>>>> > > When running e.g. on Spark and job encounters kind of failure which
>>>> cause a loss of some data from previous stages, Spark does issue recompute
>>>> of necessary task in necessary stages to recover data. Because the shard
>>>> assignment function is random as default, some data will end up in
>>>> different shards and cause duplicates in the final output.
>>>>
>>>> > I think this was already addressed. The correct solution here is to
>>>> implement RequiresStableInput for runners that do not already support that
>>>> and update FileIO to use that.
>>>>
>>>> How will @RequiresStableInput prevent this situation when running batch
>>>> use case?
>>>>
>>>
>>> So this is handled in combination of @RequiresStableInput and output
>>> file finalization. @RequiresStableInput (or Reshuffle for most runners)
>>> makes sure that the input provided for the write stage does not get
>>> recomputed in the presence of failures. Output finalization makes sure that
>>> we only finalize one run of each bundle and discard the rest.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>>
>>>> On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> how do we proceed with reviewing MR proposed for this change?
>>>>>>
>>>>>> I sense there is a concern exposing existing sharding function to the
>>>>>> API. But from the discussion here I do not have a clear picture
>>>>>> about arguments not doing so.
>>>>>> Only one argument was that dynamic destinations should be able to do
>>>>>> the same. While this is true, as it is illustrated in previous commnet, 
>>>>>> it
>>>>>> is not simple nor convenient to use and requires more customization than
>>>>>> exposing sharding which is already there.
>>>>>> Are there more negatives to exposing sharding function?
>>>>>>
>>>>>
>>>>> Seems like currently FlinkStreamingPipelineTranslator is the only real
>>>>> usage of WriteFiles.withShardingFunction() :
>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L281
>>>>> WriteFiles is not expected to be used by pipeline authors but exposing
>>>>> this through the FileIO will make this available to pipeline authors (and
>>>>> some may choose to do so for various reasons).
>>>>> This will result in sharding being strict and runners being inflexible
>>>>> when it comes to optimizing execution of pipelines that use FileIO.
>>>>>
>>>>> Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>>> runners to optimize better. I think one concern might be that the addition
>>>>> of this option might be going against this.
>>>>>
>>>>> Also looking at your original reasoning for adding this.
>>>>>
>>>>> > I need to generate shards which are compatible with Hive bucketing
>>>>> and therefore need to decide shard assignment based on data fields of 
>>>>> input
>>>>> element
>>>>>
>>>>> This sounds very specific to Hive. Again I think the downside here is
>>>>> that we will be giving up the flexibility for the runner to optimize the
>>>>> pipeline and decide sharding. Do you think it's possible to somehow relax
>>>>> this requirement for Hive or use a secondary process to format the output
>>>>> to a form that is suitable for Hive after being written to an intermediate
>>>>> storage.
>>>>>
>>>>> > When running e.g. on Spark and job encounters kind of failure which
>>>>> cause a loss of some data from previous stages, Spark does issue recompute
>>>>> of necessary task in necessary stages to recover data. Because the shard
>>>>> assignment function is random as default, some data will end up in
>>>>> different shards and cause duplicates in the final output.
>>>>>
>>>>> I think this was already addressed. The correct solution here is to
>>>>> implement RequiresStableInput for runners that do not already support that
>>>>> and update FileIO to use that.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>>
>>>>>> On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The difference in my opinion is in distinguishing between - as
>>>>>>> written in this thread - physical vs logical properties of the 
>>>>>>> pipeline. I
>>>>>>> proposed to keep dynamic destination (logical) and sharding (physical)
>>>>>>> separate on API level as it is at implementation level.
>>>>>>>
>>>>>>> When I reason about using `by()` for my case ... I am using dynamic
>>>>>>> destination to partition data into hourly folders. So my destination is
>>>>>>> e.g. `2021-06-23-07`. To add a shard there I assume I will need
>>>>>>> * encode shard to the destination .. e.g. in form of file prefix
>>>>>>> `2021-06-23-07/shard-1`
>>>>>>> * dynamic destination now does not span a group of files but must be
>>>>>>> exactly one file
>>>>>>> * to respect above, I have to. "disable" sharding in WriteFiles and
>>>>>>> make sure to use `.withNumShards(1)` ... I am not sure what `runnner
>>>>>>> determined` sharding would do, if it would not split destination further
>>>>>>> into more files
>>>>>>> * make sure that FileNameing will respect my intent and name files
>>>>>>> as I expect based on that destination
>>>>>>> * post process `WriteFilesResult` to turn my destination which
>>>>>>> targets physical single file (date-with-hour + shard-num) back into the
>>>>>>> destination with targets logical group of files (date-with-hour) so I 
>>>>>>> hook
>>>>>>> it up do downstream post-process as usual
>>>>>>>
>>>>>>> Am I roughly correct? Or do I miss something more straight forward?
>>>>>>> If the above is correct then it feels more fragile and less
>>>>>>> intuitive to me than the option in my MR.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> I'm not sure I understand your PR. How is this PR different than
>>>>>>>> the by() method in FileIO?
>>>>>>>>
>>>>>>>> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> MR for review for this change is here:
>>>>>>>>> https://github.com/apache/beam/pull/15051
>>>>>>>>>
>>>>>>>>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek <
>>>>>>>>> jozo.vil...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I would like this thread to stay focused on sharding FileIO only.
>>>>>>>>>> Possible change to the model is an interesting topic but of a much
>>>>>>>>>> different scope.
>>>>>>>>>>
>>>>>>>>>> Yes, I agree that sharding is mostly a physical rather than
>>>>>>>>>> logical property of the pipeline. That is why it feels more natural 
>>>>>>>>>> to
>>>>>>>>>> distinguish between those two on the API level.
>>>>>>>>>> As for handling sharding requirements by adding more sugar to
>>>>>>>>>> dynamic destinations + file naming one has to keep in mind that 
>>>>>>>>>> results of
>>>>>>>>>> dynamic writes can be observed in the form of KV<DestinationT, 
>>>>>>>>>> String>,
>>>>>>>>>> so written files per dynamic destination. Often we do GBP to 
>>>>>>>>>> post-process
>>>>>>>>>> files per destination / logical group. If sharding would be encoded 
>>>>>>>>>> there,
>>>>>>>>>> then it might complicate things either downstream or inside the 
>>>>>>>>>> sugar part
>>>>>>>>>> to put shard in and then take it out later.
>>>>>>>>>> From the user perspective I do not see much difference. We would
>>>>>>>>>> still need to allow API to define both behaviors and it would only be
>>>>>>>>>> executed differently by implementation.
>>>>>>>>>> I do not see a value in changing FileIO (WriteFiles) logic to
>>>>>>>>>> stop using sharding and use dynamic destination for both given that
>>>>>>>>>> sharding function is already there and in use.
>>>>>>>>>>
>>>>>>>>>> To the point of external shuffle and non-deterministic user
>>>>>>>>>> input.
>>>>>>>>>> Yes users can create non-deterministic behaviors but they are in
>>>>>>>>>> control. Here, Beam internally adds non-deterministic behavior and 
>>>>>>>>>> users
>>>>>>>>>> can not opt-out.
>>>>>>>>>> All works fine as long as external shuffle service (depends on
>>>>>>>>>> Runner) holds to the data and hands it out on retries. However if 
>>>>>>>>>> data in
>>>>>>>>>> shuffle service is lost for some reason - e.g. disk failure, node 
>>>>>>>>>> breaks
>>>>>>>>>> down - then pipeline have to recover the data by recomputing 
>>>>>>>>>> necessary
>>>>>>>>>> paths.
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <
>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sharding is typically a physical rather than logical property of
>>>>>>>>>>> the
>>>>>>>>>>> pipeline, and I'm not convinced it makes sense to add it to Beam
>>>>>>>>>>> in
>>>>>>>>>>> general. One can already use keys and GBK/Stateful DoFns if some
>>>>>>>>>>> kind
>>>>>>>>>>> of logical grouping is needed, and adding constraints like this
>>>>>>>>>>> can
>>>>>>>>>>> prevent opportunities for optimizations (like dynamic sharding
>>>>>>>>>>> and
>>>>>>>>>>> fusion).
>>>>>>>>>>>
>>>>>>>>>>> That being said, file output are one area where it could make
>>>>>>>>>>> sense. I
>>>>>>>>>>> would expect that dynamic destinations could cover this usecase,
>>>>>>>>>>> and a
>>>>>>>>>>> general FileNaming subclass could be provided to make this
>>>>>>>>>>> pattern
>>>>>>>>>>> easier (and possibly some syntactic sugar for auto-setting num
>>>>>>>>>>> shards
>>>>>>>>>>> to 0). (One downside of this approach is that one couldn't do
>>>>>>>>>>> dynamic
>>>>>>>>>>> destinations, and have each sharded with a distinct sharing
>>>>>>>>>>> function
>>>>>>>>>>> as well.)
>>>>>>>>>>>
>>>>>>>>>>> If this doesn't work, we could look into adding ShardingFunction
>>>>>>>>>>> as a
>>>>>>>>>>> publicly exposed parameter to FileIO. (I'm actually surprised it
>>>>>>>>>>> already exists.)
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 17, 2021 at 9:39 AM <je...@seznam.cz> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Alright, but what is worth emphasizing is that we talk about
>>>>>>>>>>> batch workloads. The typical scenario is that the output is 
>>>>>>>>>>> committed once
>>>>>>>>>>> the job finishes (e.g., by atomic rename of directory).
>>>>>>>>>>> >  Jan
>>>>>>>>>>> >
>>>>>>>>>>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <
>>>>>>>>>>> re...@google.com>:
>>>>>>>>>>> >
>>>>>>>>>>> > Yes - the problem is that Beam makes no guarantees of
>>>>>>>>>>> determinism anywhere in the system. User DoFns might be non 
>>>>>>>>>>> deterministic,
>>>>>>>>>>> and have no way to know (we've discussed proving users with an
>>>>>>>>>>> @IsDeterministic annotation, however empirically users often think 
>>>>>>>>>>> their
>>>>>>>>>>> functions are deterministic when they are in fact not). _Any_ sort 
>>>>>>>>>>> of
>>>>>>>>>>> triggered aggregation (including watermark based) can always be non
>>>>>>>>>>> deterministic.
>>>>>>>>>>> >
>>>>>>>>>>> > Even if everything was deterministic, replaying everything is
>>>>>>>>>>> tricky. The output files already exist - should the system delete 
>>>>>>>>>>> them and
>>>>>>>>>>> recreate them, or leave them as is and delete the temp files? Either
>>>>>>>>>>> decision could be problematic.
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Correct, by the external shuffle service I pretty much meant
>>>>>>>>>>> "offloading the contents of a shuffle phase out of the system". 
>>>>>>>>>>> Looks like
>>>>>>>>>>> that is what the Spark's checkpoint does as well. On the other hand 
>>>>>>>>>>> (if I
>>>>>>>>>>> understand the concept correctly), that implies some performance 
>>>>>>>>>>> penalty -
>>>>>>>>>>> the data has to be moved to external distributed filesystem. Which 
>>>>>>>>>>> then
>>>>>>>>>>> feels weird if we optimize code to avoid computing random numbers, 
>>>>>>>>>>> but are
>>>>>>>>>>> okay with moving complete datasets back and forth. I think in this
>>>>>>>>>>> particular case making the Pipeline deterministic - idempotent to be
>>>>>>>>>>> precise - (within the limits, yes, hashCode of enum is not stable 
>>>>>>>>>>> between
>>>>>>>>>>> JVMs) would seem more practical to me.
>>>>>>>>>>> >
>>>>>>>>>>> >  Jan
>>>>>>>>>>> >
>>>>>>>>>>> > On 6/17/21 7:09 AM, Reuven Lax wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > I have some thoughts here, as Eugene Kirpichov and I spent a
>>>>>>>>>>> lot of time working through these semantics in the past.
>>>>>>>>>>> >
>>>>>>>>>>> > First - about the problem of duplicates:
>>>>>>>>>>> >
>>>>>>>>>>> > A "deterministic" sharding - e.g. hashCode based (though Java
>>>>>>>>>>> makes no guarantee that hashCode is stable across JVM instances, so 
>>>>>>>>>>> this
>>>>>>>>>>> technique ends up not being stable) doesn't really help matters in 
>>>>>>>>>>> Beam.
>>>>>>>>>>> Unlike other systems, Beam makes no assumptions that transforms are
>>>>>>>>>>> idempotent or deterministic. What's more, _any_ transform that has 
>>>>>>>>>>> any sort
>>>>>>>>>>> of triggered grouping (whether the trigger used is watermark based 
>>>>>>>>>>> or
>>>>>>>>>>> otherwise) is non deterministic.
>>>>>>>>>>> >
>>>>>>>>>>> > Forcing a hash of every element imposed quite a CPU cost; even
>>>>>>>>>>> generating a random number per-element slowed things down too much, 
>>>>>>>>>>> which
>>>>>>>>>>> is why the current code generates a random number only in 
>>>>>>>>>>> startBundle.
>>>>>>>>>>> >
>>>>>>>>>>> > Any runner that does not implement RequiresStableInput will
>>>>>>>>>>> not properly execute FileIO. Dataflow and Flink both support this. I
>>>>>>>>>>> believe that the Spark runner implicitly supports it by manually 
>>>>>>>>>>> calling
>>>>>>>>>>> checkpoint as Ken mentioned (unless someone removed that from the 
>>>>>>>>>>> Spark
>>>>>>>>>>> runner, but if so that would be a correctness regression). 
>>>>>>>>>>> Implementing
>>>>>>>>>>> this has nothing to do with external shuffle services - neither 
>>>>>>>>>>> Flink,
>>>>>>>>>>> Spark, nor Dataflow appliance (classic shuffle) have any problem 
>>>>>>>>>>> correctly
>>>>>>>>>>> implementing RequiresStableInput.
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > I think that the support for @RequiresStableInput is rather
>>>>>>>>>>> limited. AFAIK it is supported by streaming Flink (where it is not 
>>>>>>>>>>> needed
>>>>>>>>>>> in this situation) and by Dataflow. Batch runners without external 
>>>>>>>>>>> shuffle
>>>>>>>>>>> service that works as in the case of Dataflow have IMO no way to 
>>>>>>>>>>> implement
>>>>>>>>>>> it correctly. In the case of FileIO (which do not use 
>>>>>>>>>>> @RequiresStableInput
>>>>>>>>>>> as it would not be supported on Spark) the randomness is easily 
>>>>>>>>>>> avoidable
>>>>>>>>>>> (hashCode of key?) and I it seems to me it should be preferred.
>>>>>>>>>>> >
>>>>>>>>>>> >  Jan
>>>>>>>>>>> >
>>>>>>>>>>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Hi,
>>>>>>>>>>> >
>>>>>>>>>>> > maybe a little unrelated, but I think we definitely should not
>>>>>>>>>>> use random assignment of shard keys (RandomShardingFunction), at 
>>>>>>>>>>> least for
>>>>>>>>>>> bounded workloads (seems to be fine for streaming workloads). Many 
>>>>>>>>>>> batch
>>>>>>>>>>> runners simply recompute path in the computation DAG from the 
>>>>>>>>>>> failed node
>>>>>>>>>>> (transform) to the root (source). In the case there is any 
>>>>>>>>>>> non-determinism
>>>>>>>>>>> involved in the logic, then it can result in duplicates (as the 
>>>>>>>>>>> 'previous'
>>>>>>>>>>> attempt might have ended in DAG path that was not affected by the 
>>>>>>>>>>> fail).
>>>>>>>>>>> That addresses the option 2) of what Jozef have mentioned.
>>>>>>>>>>> >
>>>>>>>>>>> > This is the reason we introduced "@RequiresStableInput".
>>>>>>>>>>> >
>>>>>>>>>>> > When things were only Dataflow, we knew that each shuffle was
>>>>>>>>>>> a checkpoint, so we inserted a Reshuffle after the random numbers 
>>>>>>>>>>> were
>>>>>>>>>>> generated, freezing them so it was safe for replay. Since other 
>>>>>>>>>>> engines do
>>>>>>>>>>> not checkpoint at every shuffle, we needed a way to communicate 
>>>>>>>>>>> that this
>>>>>>>>>>> checkpointing was required for correctness. I think we still have 
>>>>>>>>>>> many IOs
>>>>>>>>>>> that are written using Reshuffle instead of @RequiresStableInput, 
>>>>>>>>>>> and I
>>>>>>>>>>> don't know which runners process @RequiresStableInput properly.
>>>>>>>>>>> >
>>>>>>>>>>> > By the way, I believe the SparkRunner explicitly calls
>>>>>>>>>>> materialize() after a GBK specifically so that it gets correct 
>>>>>>>>>>> results for
>>>>>>>>>>> IOs that rely on Reshuffle. Has that changed?
>>>>>>>>>>> >
>>>>>>>>>>> > I agree that we should minimize use of RequiresStableInput. It
>>>>>>>>>>> has a significant cost, and the cost varies across runners. If we 
>>>>>>>>>>> can use a
>>>>>>>>>>> deterministic function, we should.
>>>>>>>>>>> >
>>>>>>>>>>> > Kenn
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >  Jan
>>>>>>>>>>> >
>>>>>>>>>>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <
>>>>>>>>>>> k...@apache.org> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > In general, Beam only deals with keys and grouping by key. I
>>>>>>>>>>> think expanding this idea to some more abstract notion of a sharding
>>>>>>>>>>> function could make sense.
>>>>>>>>>>> >
>>>>>>>>>>> > For FileIO specifically, I wonder if you can use
>>>>>>>>>>> writeDynamic() to get the behavior you are seeking.
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > The change in mind looks like this:
>>>>>>>>>>> >
>>>>>>>>>>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
>>>>>>>>>>> >
>>>>>>>>>>> > Dynamic Destinations in my mind is more towards the need for
>>>>>>>>>>> "partitioning" data (destination as directory level) or if one 
>>>>>>>>>>> needs to
>>>>>>>>>>> handle groups of events differently, e.g. write some events in 
>>>>>>>>>>> FormatA and
>>>>>>>>>>> others in FormatB.
>>>>>>>>>>> > Shards are now used for distributing writes or bucketing of
>>>>>>>>>>> events within a particular destination group. More specifically, 
>>>>>>>>>>> currently,
>>>>>>>>>>> each element is assigned `ShardedKey<Integer>` [1] before GBK 
>>>>>>>>>>> operation.
>>>>>>>>>>> Sharded key is a compound of destination and assigned shard.
>>>>>>>>>>> >
>>>>>>>>>>> > Having said that, I might be able to use dynamic destination
>>>>>>>>>>> for this, possibly with the need of custom FileNaming, and set 
>>>>>>>>>>> shards to be
>>>>>>>>>>> always 1. But it feels less natural than allowing the user to swap 
>>>>>>>>>>> already
>>>>>>>>>>> present `RandomShardingFunction` [2] with something of his own 
>>>>>>>>>>> choosing.
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > [1]
>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
>>>>>>>>>>> > [2]
>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
>>>>>>>>>>> >
>>>>>>>>>>> > Kenn
>>>>>>>>>>> >
>>>>>>>>>>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <
>>>>>>>>>>> tyso...@google.com> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Adding sharding to the model may require a wider discussion
>>>>>>>>>>> than FileIO alone. I'm not entirely sure how wide, or if this has 
>>>>>>>>>>> been
>>>>>>>>>>> proposed before, but IMO it warrants a design doc or proposal.
>>>>>>>>>>> >
>>>>>>>>>>> > A couple high level questions I can think of are,
>>>>>>>>>>> >   - What runners support sharding?
>>>>>>>>>>> >       * There will be some work in Dataflow required to
>>>>>>>>>>> support this but I'm not sure how much.
>>>>>>>>>>> >   - What does sharding mean for streaming pipelines?
>>>>>>>>>>> >
>>>>>>>>>>> > A more nitty-detail question:
>>>>>>>>>>> >   - How can this be achieved performantly? For example, if a
>>>>>>>>>>> shuffle is required to achieve a particular sharding constraint, 
>>>>>>>>>>> should we
>>>>>>>>>>> allow transforms to declare they don't modify the sharding property 
>>>>>>>>>>> (e.g.
>>>>>>>>>>> key preserving) which may allow a runner to avoid an additional 
>>>>>>>>>>> shuffle if
>>>>>>>>>>> a preceding shuffle can guarantee the sharding requirements?
>>>>>>>>>>> >
>>>>>>>>>>> > Where X is the shuffle that could be avoided: input -> shuffle
>>>>>>>>>>> (key sharding fn A) -> transform1 (key preserving) -> transform 2 
>>>>>>>>>>> (key
>>>>>>>>>>> preserving) -> X -> fileio (key sharding fn A)
>>>>>>>>>>> >
>>>>>>>>>>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <
>>>>>>>>>>> jozo.vil...@gmail.com> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > I would like to extend FileIO with possibility to specify a
>>>>>>>>>>> custom sharding function:
>>>>>>>>>>> > https://issues.apache.org/jira/browse/BEAM-12493
>>>>>>>>>>> >
>>>>>>>>>>> > I have 2 use-cases for this:
>>>>>>>>>>> >
>>>>>>>>>>> > I need to generate shards which are compatible with Hive
>>>>>>>>>>> bucketing and therefore need to decide shard assignment based on 
>>>>>>>>>>> data
>>>>>>>>>>> fields of input element
>>>>>>>>>>> > When running e.g. on Spark and job encounters kind of failure
>>>>>>>>>>> which cause a loss of some data from previous stages, Spark does 
>>>>>>>>>>> issue
>>>>>>>>>>> recompute of necessary task in necessary stages to recover data. 
>>>>>>>>>>> Because
>>>>>>>>>>> the shard assignment function is random as default, some data will 
>>>>>>>>>>> end up
>>>>>>>>>>> in different shards and cause duplicates in the final output.
>>>>>>>>>>> >
>>>>>>>>>>> > Please let me know your thoughts in case you see a reason to
>>>>>>>>>>> not to add such improvement.
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks,
>>>>>>>>>>> > Jozef
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to