I don't think this has anything to do with external shuffle services.

Arbitrarily recomputing data is fundamentally incompatible with Beam, since
Beam does not restrict transforms to being deterministic. The Spark runner
works (at least it did last I checked) by checkpointing the RDD. Spark will
not recompute the DAG past a checkpoint, so this creates stable input to a
transform. This adds some cost to the Spark runner, but makes things
correct. You should not have sharding problems due to replay unless there
is a bug in the current Spark runner.

Beam does not restrict non-determinism, true. If users do add it, then they
can work around it should they need to address any side effects. But should
non-determinism be deliberately added by Beam core? Probably can if runners
can 100% deal with that effectively. To the point of RDD `checkpoint`,
afaik SparkRunner does use `cache`, not checkpoint. Also, I thought cache
is invoked on fork points in DAG. I have just a join and map and in such
cases I thought data is always served out by shuffle service. Am I mistaken?


On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>>
>> How will @RequiresStableInput prevent this situation when running batch
>>> use case?
>>>
>>
>> So this is handled in combination of @RequiresStableInput and output file
>> finalization. @RequiresStableInput (or Reshuffle for most runners) makes
>> sure that the input provided for the write stage does not get recomputed in
>> the presence of failures. Output finalization makes sure that we only
>> finalize one run of each bundle and discard the rest.
>>
>> So this I believe relies on having robust external service to hold
>> shuffle data and serve it out when needed so pipeline does not need to
>> recompute it via non-deterministic function. In Spark however, shuffle
>> service can not be (if I am not mistaking) deployed in this fashion (HA +
>> replicated shuffle data). Therefore, if instance of shuffle service holding
>> a portion of shuffle data fails, spark recovers it by recomputing parts of
>> a DAG from source to recover lost shuffle results. I am not sure what Beam
>> can do here to prevent it or make it stable? Will @RequiresStableInput work
>> as expected? ... Note that this, I believe, concerns only the batch.
>>
>
> I don't think this has anything to do with external shuffle services.
>
> Arbitrarily recomputing data is fundamentally incompatible with Beam,
> since Beam does not restrict transforms to being deterministic. The Spark
> runner works (at least it did last I checked) by checkpointing the RDD.
> Spark will not recompute the DAG past a checkpoint, so this creates stable
> input to a transform. This adds some cost to the Spark runner, but makes
> things correct. You should not have sharding problems due to replay unless
> there is a bug in the current Spark runner.
>
>
>>
>> Also, when withNumShards() truly have to be used, round robin assignment
>> of elements to shards sounds like the optimal solution (at least for
>> the vast majority of pipelines)
>>
>> I agree. But random seed is my problem here with respect to the situation
>> mentioned above.
>>
>> Right, I would like to know if there are more true use-cases before
>> adding this. This essentially allows users to map elements to exact output
>> shards which could change the characteristics of pipelines in very
>> significant ways without users being aware of it. For example, this could
>> result in extremely imbalanced workloads for any downstream processors. If
>> it's just Hive I would rather work around it (even with a perf penalty for
>> that case).
>>
>> User would have to explicitly ask FileIO to use specific sharding.
>> Documentation can educate about the tradeoffs. But I am open to workaround
>> alternatives.
>>
>>
>> On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>> wrote:
>>>
>>>> Hi Cham, thanks for the feedback
>>>>
>>>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>> runners to optimize better. I think one concern might be that the addition
>>>> of this option might be going against this.
>>>>
>>>> I agree that less knobs is more. But if assignment of a key is specific
>>>> per user request via API (optional), than runner should not optimize that
>>>> but need to respect how data is requested to be laid down. It could however
>>>> optimize number of shards as it is doing right now if numShards is not set
>>>> explicitly.
>>>> Anyway FileIO feels fluid here. You can leave shards empty and let
>>>> runner decide or optimize,  but only in batch and not in streaming.  Then
>>>> it allows you to set number of shards but never let you change the logic
>>>> how it is assigned and defaults to round robin with random seed. It begs
>>>> question for me why I can manipulate number of shards but not the logic of
>>>> assignment. Are there plans to (or already case implemented) optimize
>>>> around and have runner changing that under different circumstances?
>>>>
>>>
>>> I don't know the exact history behind withNumShards() feature for batch.
>>> I would guess it was originally introduced due to limitations for streaming
>>> but was made available to batch as well with a strong performance warning.
>>>
>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L159
>>>
>>> Also, when withNumShards() truly have to be used, round robin assignment
>>> of elements to shards sounds like the optimal solution (at least for
>>> the vast majority of pipelines)
>>>
>>>
>>>>
>>>> > Also looking at your original reasoning for adding this.
>>>>
>>>> > > I need to generate shards which are compatible with Hive bucketing
>>>> and therefore need to decide shard assignment based on data fields of input
>>>> element
>>>>
>>>> > This sounds very specific to Hive. Again I think the downside here is
>>>> that we will be giving up the flexibility for the runner to optimize the
>>>> pipeline and decide sharding. Do you think it's possible to somehow relax
>>>> this requirement for > > > Hive or use a secondary process to format the
>>>> output to a form that is suitable for Hive after being written to an
>>>> intermediate storage.
>>>>
>>>> It is possible. But it is quite suboptimal because of extra IO penalty
>>>> and I would have to use completely custom file writing as FileIO would not
>>>> support this. Then naturally the question would be, should I use FileIO at
>>>> all? I am quite not sure if I am doing something so rare that it is not and
>>>> can not be supported. Maybe I am. I remember an old discussion from Scio
>>>> guys when they wanted to introduce Sorted Merge Buckets to FileIO where
>>>> sharding was also needed to be manipulated (among other things)
>>>>
>>>
>>> Right, I would like to know if there are more true use-cases before
>>> adding this. This essentially allows users to map elements to exact output
>>> shards which could change the characteristics of pipelines in very
>>> significant ways without users being aware of it. For example, this could
>>> result in extremely imbalanced workloads for any downstream processors. If
>>> it's just Hive I would rather work around it (even with a perf penalty for
>>> that case).
>>>
>>>
>>>>
>>>> > > When running e.g. on Spark and job encounters kind of failure which
>>>> cause a loss of some data from previous stages, Spark does issue recompute
>>>> of necessary task in necessary stages to recover data. Because the shard
>>>> assignment function is random as default, some data will end up in
>>>> different shards and cause duplicates in the final output.
>>>>
>>>> > I think this was already addressed. The correct solution here is to
>>>> implement RequiresStableInput for runners that do not already support that
>>>> and update FileIO to use that.
>>>>
>>>> How will @RequiresStableInput prevent this situation when running batch
>>>> use case?
>>>>
>>>
>>> So this is handled in combination of @RequiresStableInput and output
>>> file finalization. @RequiresStableInput (or Reshuffle for most runners)
>>> makes sure that the input provided for the write stage does not get
>>> recomputed in the presence of failures. Output finalization makes sure that
>>> we only finalize one run of each bundle and discard the rest.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>>
>>>> On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> how do we proceed with reviewing MR proposed for this change?
>>>>>>
>>>>>> I sense there is a concern exposing existing sharding function to the
>>>>>> API. But from the discussion here I do not have a clear picture
>>>>>> about arguments not doing so.
>>>>>> Only one argument was that dynamic destinations should be able to do
>>>>>> the same. While this is true, as it is illustrated in previous commnet, 
>>>>>> it
>>>>>> is not simple nor convenient to use and requires more customization than
>>>>>> exposing sharding which is already there.
>>>>>> Are there more negatives to exposing sharding function?
>>>>>>
>>>>>
>>>>> Seems like currently FlinkStreamingPipelineTranslator is the only real
>>>>> usage of WriteFiles.withShardingFunction() :
>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L281
>>>>> WriteFiles is not expected to be used by pipeline authors but exposing
>>>>> this through the FileIO will make this available to pipeline authors (and
>>>>> some may choose to do so for various reasons).
>>>>> This will result in sharding being strict and runners being inflexible
>>>>> when it comes to optimizing execution of pipelines that use FileIO.
>>>>>
>>>>> Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>>> runners to optimize better. I think one concern might be that the addition
>>>>> of this option might be going against this.
>>>>>
>>>>> Also looking at your original reasoning for adding this.
>>>>>
>>>>> > I need to generate shards which are compatible with Hive bucketing
>>>>> and therefore need to decide shard assignment based on data fields of 
>>>>> input
>>>>> element
>>>>>
>>>>> This sounds very specific to Hive. Again I think the downside here is
>>>>> that we will be giving up the flexibility for the runner to optimize the
>>>>> pipeline and decide sharding. Do you think it's possible to somehow relax
>>>>> this requirement for Hive or use a secondary process to format the output
>>>>> to a form that is suitable for Hive after being written to an intermediate
>>>>> storage.
>>>>>
>>>>> > When running e.g. on Spark and job encounters kind of failure which
>>>>> cause a loss of some data from previous stages, Spark does issue recompute
>>>>> of necessary task in necessary stages to recover data. Because the shard
>>>>> assignment function is random as default, some data will end up in
>>>>> different shards and cause duplicates in the final output.
>>>>>
>>>>> I think this was already addressed. The correct solution here is to
>>>>> implement RequiresStableInput for runners that do not already support that
>>>>> and update FileIO to use that.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>>
>>>>>> On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The difference in my opinion is in distinguishing between - as
>>>>>>> written in this thread - physical vs logical properties of the 
>>>>>>> pipeline. I
>>>>>>> proposed to keep dynamic destination (logical) and sharding (physical)
>>>>>>> separate on API level as it is at implementation level.
>>>>>>>
>>>>>>> When I reason about using `by()` for my case ... I am using dynamic
>>>>>>> destination to partition data into hourly folders. So my destination is
>>>>>>> e.g. `2021-06-23-07`. To add a shard there I assume I will need
>>>>>>> * encode shard to the destination .. e.g. in form of file prefix
>>>>>>> `2021-06-23-07/shard-1`
>>>>>>> * dynamic destination now does not span a group of files but must be
>>>>>>> exactly one file
>>>>>>> * to respect above, I have to. "disable" sharding in WriteFiles and
>>>>>>> make sure to use `.withNumShards(1)` ... I am not sure what `runnner
>>>>>>> determined` sharding would do, if it would not split destination further
>>>>>>> into more files
>>>>>>> * make sure that FileNameing will respect my intent and name files
>>>>>>> as I expect based on that destination
>>>>>>> * post process `WriteFilesResult` to turn my destination which
>>>>>>> targets physical single file (date-with-hour + shard-num) back into the
>>>>>>> destination with targets logical group of files (date-with-hour) so I 
>>>>>>> hook
>>>>>>> it up do downstream post-process as usual
>>>>>>>
>>>>>>> Am I roughly correct? Or do I miss something more straight forward?
>>>>>>> If the above is correct then it feels more fragile and less
>>>>>>> intuitive to me than the option in my MR.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> I'm not sure I understand your PR. How is this PR different than
>>>>>>>> the by() method in FileIO?
>>>>>>>>
>>>>>>>> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> MR for review for this change is here:
>>>>>>>>> https://github.com/apache/beam/pull/15051
>>>>>>>>>
>>>>>>>>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek <
>>>>>>>>> jozo.vil...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I would like this thread to stay focused on sharding FileIO only.
>>>>>>>>>> Possible change to the model is an interesting topic but of a much
>>>>>>>>>> different scope.
>>>>>>>>>>
>>>>>>>>>> Yes, I agree that sharding is mostly a physical rather than
>>>>>>>>>> logical property of the pipeline. That is why it feels more natural 
>>>>>>>>>> to
>>>>>>>>>> distinguish between those two on the API level.
>>>>>>>>>> As for handling sharding requirements by adding more sugar to
>>>>>>>>>> dynamic destinations + file naming one has to keep in mind that 
>>>>>>>>>> results of
>>>>>>>>>> dynamic writes can be observed in the form of KV<DestinationT, 
>>>>>>>>>> String>,
>>>>>>>>>> so written files per dynamic destination. Often we do GBP to 
>>>>>>>>>> post-process
>>>>>>>>>> files per destination / logical group. If sharding would be encoded 
>>>>>>>>>> there,
>>>>>>>>>> then it might complicate things either downstream or inside the 
>>>>>>>>>> sugar part
>>>>>>>>>> to put shard in and then take it out later.
>>>>>>>>>> From the user perspective I do not see much difference. We would
>>>>>>>>>> still need to allow API to define both behaviors and it would only be
>>>>>>>>>> executed differently by implementation.
>>>>>>>>>> I do not see a value in changing FileIO (WriteFiles) logic to
>>>>>>>>>> stop using sharding and use dynamic destination for both given that
>>>>>>>>>> sharding function is already there and in use.
>>>>>>>>>>
>>>>>>>>>> To the point of external shuffle and non-deterministic user
>>>>>>>>>> input.
>>>>>>>>>> Yes users can create non-deterministic behaviors but they are in
>>>>>>>>>> control. Here, Beam internally adds non-deterministic behavior and 
>>>>>>>>>> users
>>>>>>>>>> can not opt-out.
>>>>>>>>>> All works fine as long as external shuffle service (depends on
>>>>>>>>>> Runner) holds to the data and hands it out on retries. However if 
>>>>>>>>>> data in
>>>>>>>>>> shuffle service is lost for some reason - e.g. disk failure, node 
>>>>>>>>>> breaks
>>>>>>>>>> down - then pipeline have to recover the data by recomputing 
>>>>>>>>>> necessary
>>>>>>>>>> paths.
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <
>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sharding is typically a physical rather than logical property of
>>>>>>>>>>> the
>>>>>>>>>>> pipeline, and I'm not convinced it makes sense to add it to Beam
>>>>>>>>>>> in
>>>>>>>>>>> general. One can already use keys and GBK/Stateful DoFns if some
>>>>>>>>>>> kind
>>>>>>>>>>> of logical grouping is needed, and adding constraints like this
>>>>>>>>>>> can
>>>>>>>>>>> prevent opportunities for optimizations (like dynamic sharding
>>>>>>>>>>> and
>>>>>>>>>>> fusion).
>>>>>>>>>>>
>>>>>>>>>>> That being said, file output are one area where it could make
>>>>>>>>>>> sense. I
>>>>>>>>>>> would expect that dynamic destinations could cover this usecase,
>>>>>>>>>>> and a
>>>>>>>>>>> general FileNaming subclass could be provided to make this
>>>>>>>>>>> pattern
>>>>>>>>>>> easier (and possibly some syntactic sugar for auto-setting num
>>>>>>>>>>> shards
>>>>>>>>>>> to 0). (One downside of this approach is that one couldn't do
>>>>>>>>>>> dynamic
>>>>>>>>>>> destinations, and have each sharded with a distinct sharing
>>>>>>>>>>> function
>>>>>>>>>>> as well.)
>>>>>>>>>>>
>>>>>>>>>>> If this doesn't work, we could look into adding ShardingFunction
>>>>>>>>>>> as a
>>>>>>>>>>> publicly exposed parameter to FileIO. (I'm actually surprised it
>>>>>>>>>>> already exists.)
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 17, 2021 at 9:39 AM <je...@seznam.cz> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Alright, but what is worth emphasizing is that we talk about
>>>>>>>>>>> batch workloads. The typical scenario is that the output is 
>>>>>>>>>>> committed once
>>>>>>>>>>> the job finishes (e.g., by atomic rename of directory).
>>>>>>>>>>> >  Jan
>>>>>>>>>>> >
>>>>>>>>>>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <
>>>>>>>>>>> re...@google.com>:
>>>>>>>>>>> >
>>>>>>>>>>> > Yes - the problem is that Beam makes no guarantees of
>>>>>>>>>>> determinism anywhere in the system. User DoFns might be non 
>>>>>>>>>>> deterministic,
>>>>>>>>>>> and have no way to know (we've discussed proving users with an
>>>>>>>>>>> @IsDeterministic annotation, however empirically users often think 
>>>>>>>>>>> their
>>>>>>>>>>> functions are deterministic when they are in fact not). _Any_ sort 
>>>>>>>>>>> of
>>>>>>>>>>> triggered aggregation (including watermark based) can always be non
>>>>>>>>>>> deterministic.
>>>>>>>>>>> >
>>>>>>>>>>> > Even if everything was deterministic, replaying everything is
>>>>>>>>>>> tricky. The output files already exist - should the system delete 
>>>>>>>>>>> them and
>>>>>>>>>>> recreate them, or leave them as is and delete the temp files? Either
>>>>>>>>>>> decision could be problematic.
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Correct, by the external shuffle service I pretty much meant
>>>>>>>>>>> "offloading the contents of a shuffle phase out of the system". 
>>>>>>>>>>> Looks like
>>>>>>>>>>> that is what the Spark's checkpoint does as well. On the other hand 
>>>>>>>>>>> (if I
>>>>>>>>>>> understand the concept correctly), that implies some performance 
>>>>>>>>>>> penalty -
>>>>>>>>>>> the data has to be moved to external distributed filesystem. Which 
>>>>>>>>>>> then
>>>>>>>>>>> feels weird if we optimize code to avoid computing random numbers, 
>>>>>>>>>>> but are
>>>>>>>>>>> okay with moving complete datasets back and forth. I think in this
>>>>>>>>>>> particular case making the Pipeline deterministic - idempotent to be
>>>>>>>>>>> precise - (within the limits, yes, hashCode of enum is not stable 
>>>>>>>>>>> between
>>>>>>>>>>> JVMs) would seem more practical to me.
>>>>>>>>>>> >
>>>>>>>>>>> >  Jan
>>>>>>>>>>> >
>>>>>>>>>>> > On 6/17/21 7:09 AM, Reuven Lax wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > I have some thoughts here, as Eugene Kirpichov and I spent a
>>>>>>>>>>> lot of time working through these semantics in the past.
>>>>>>>>>>> >
>>>>>>>>>>> > First - about the problem of duplicates:
>>>>>>>>>>> >
>>>>>>>>>>> > A "deterministic" sharding - e.g. hashCode based (though Java
>>>>>>>>>>> makes no guarantee that hashCode is stable across JVM instances, so 
>>>>>>>>>>> this
>>>>>>>>>>> technique ends up not being stable) doesn't really help matters in 
>>>>>>>>>>> Beam.
>>>>>>>>>>> Unlike other systems, Beam makes no assumptions that transforms are
>>>>>>>>>>> idempotent or deterministic. What's more, _any_ transform that has 
>>>>>>>>>>> any sort
>>>>>>>>>>> of triggered grouping (whether the trigger used is watermark based 
>>>>>>>>>>> or
>>>>>>>>>>> otherwise) is non deterministic.
>>>>>>>>>>> >
>>>>>>>>>>> > Forcing a hash of every element imposed quite a CPU cost; even
>>>>>>>>>>> generating a random number per-element slowed things down too much, 
>>>>>>>>>>> which
>>>>>>>>>>> is why the current code generates a random number only in 
>>>>>>>>>>> startBundle.
>>>>>>>>>>> >
>>>>>>>>>>> > Any runner that does not implement RequiresStableInput will
>>>>>>>>>>> not properly execute FileIO. Dataflow and Flink both support this. I
>>>>>>>>>>> believe that the Spark runner implicitly supports it by manually 
>>>>>>>>>>> calling
>>>>>>>>>>> checkpoint as Ken mentioned (unless someone removed that from the 
>>>>>>>>>>> Spark
>>>>>>>>>>> runner, but if so that would be a correctness regression). 
>>>>>>>>>>> Implementing
>>>>>>>>>>> this has nothing to do with external shuffle services - neither 
>>>>>>>>>>> Flink,
>>>>>>>>>>> Spark, nor Dataflow appliance (classic shuffle) have any problem 
>>>>>>>>>>> correctly
>>>>>>>>>>> implementing RequiresStableInput.
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > I think that the support for @RequiresStableInput is rather
>>>>>>>>>>> limited. AFAIK it is supported by streaming Flink (where it is not 
>>>>>>>>>>> needed
>>>>>>>>>>> in this situation) and by Dataflow. Batch runners without external 
>>>>>>>>>>> shuffle
>>>>>>>>>>> service that works as in the case of Dataflow have IMO no way to 
>>>>>>>>>>> implement
>>>>>>>>>>> it correctly. In the case of FileIO (which do not use 
>>>>>>>>>>> @RequiresStableInput
>>>>>>>>>>> as it would not be supported on Spark) the randomness is easily 
>>>>>>>>>>> avoidable
>>>>>>>>>>> (hashCode of key?) and I it seems to me it should be preferred.
>>>>>>>>>>> >
>>>>>>>>>>> >  Jan
>>>>>>>>>>> >
>>>>>>>>>>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Hi,
>>>>>>>>>>> >
>>>>>>>>>>> > maybe a little unrelated, but I think we definitely should not
>>>>>>>>>>> use random assignment of shard keys (RandomShardingFunction), at 
>>>>>>>>>>> least for
>>>>>>>>>>> bounded workloads (seems to be fine for streaming workloads). Many 
>>>>>>>>>>> batch
>>>>>>>>>>> runners simply recompute path in the computation DAG from the 
>>>>>>>>>>> failed node
>>>>>>>>>>> (transform) to the root (source). In the case there is any 
>>>>>>>>>>> non-determinism
>>>>>>>>>>> involved in the logic, then it can result in duplicates (as the 
>>>>>>>>>>> 'previous'
>>>>>>>>>>> attempt might have ended in DAG path that was not affected by the 
>>>>>>>>>>> fail).
>>>>>>>>>>> That addresses the option 2) of what Jozef have mentioned.
>>>>>>>>>>> >
>>>>>>>>>>> > This is the reason we introduced "@RequiresStableInput".
>>>>>>>>>>> >
>>>>>>>>>>> > When things were only Dataflow, we knew that each shuffle was
>>>>>>>>>>> a checkpoint, so we inserted a Reshuffle after the random numbers 
>>>>>>>>>>> were
>>>>>>>>>>> generated, freezing them so it was safe for replay. Since other 
>>>>>>>>>>> engines do
>>>>>>>>>>> not checkpoint at every shuffle, we needed a way to communicate 
>>>>>>>>>>> that this
>>>>>>>>>>> checkpointing was required for correctness. I think we still have 
>>>>>>>>>>> many IOs
>>>>>>>>>>> that are written using Reshuffle instead of @RequiresStableInput, 
>>>>>>>>>>> and I
>>>>>>>>>>> don't know which runners process @RequiresStableInput properly.
>>>>>>>>>>> >
>>>>>>>>>>> > By the way, I believe the SparkRunner explicitly calls
>>>>>>>>>>> materialize() after a GBK specifically so that it gets correct 
>>>>>>>>>>> results for
>>>>>>>>>>> IOs that rely on Reshuffle. Has that changed?
>>>>>>>>>>> >
>>>>>>>>>>> > I agree that we should minimize use of RequiresStableInput. It
>>>>>>>>>>> has a significant cost, and the cost varies across runners. If we 
>>>>>>>>>>> can use a
>>>>>>>>>>> deterministic function, we should.
>>>>>>>>>>> >
>>>>>>>>>>> > Kenn
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >  Jan
>>>>>>>>>>> >
>>>>>>>>>>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <
>>>>>>>>>>> k...@apache.org> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > In general, Beam only deals with keys and grouping by key. I
>>>>>>>>>>> think expanding this idea to some more abstract notion of a sharding
>>>>>>>>>>> function could make sense.
>>>>>>>>>>> >
>>>>>>>>>>> > For FileIO specifically, I wonder if you can use
>>>>>>>>>>> writeDynamic() to get the behavior you are seeking.
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > The change in mind looks like this:
>>>>>>>>>>> >
>>>>>>>>>>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
>>>>>>>>>>> >
>>>>>>>>>>> > Dynamic Destinations in my mind is more towards the need for
>>>>>>>>>>> "partitioning" data (destination as directory level) or if one 
>>>>>>>>>>> needs to
>>>>>>>>>>> handle groups of events differently, e.g. write some events in 
>>>>>>>>>>> FormatA and
>>>>>>>>>>> others in FormatB.
>>>>>>>>>>> > Shards are now used for distributing writes or bucketing of
>>>>>>>>>>> events within a particular destination group. More specifically, 
>>>>>>>>>>> currently,
>>>>>>>>>>> each element is assigned `ShardedKey<Integer>` [1] before GBK 
>>>>>>>>>>> operation.
>>>>>>>>>>> Sharded key is a compound of destination and assigned shard.
>>>>>>>>>>> >
>>>>>>>>>>> > Having said that, I might be able to use dynamic destination
>>>>>>>>>>> for this, possibly with the need of custom FileNaming, and set 
>>>>>>>>>>> shards to be
>>>>>>>>>>> always 1. But it feels less natural than allowing the user to swap 
>>>>>>>>>>> already
>>>>>>>>>>> present `RandomShardingFunction` [2] with something of his own 
>>>>>>>>>>> choosing.
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>> > [1]
>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
>>>>>>>>>>> > [2]
>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
>>>>>>>>>>> >
>>>>>>>>>>> > Kenn
>>>>>>>>>>> >
>>>>>>>>>>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <
>>>>>>>>>>> tyso...@google.com> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > Adding sharding to the model may require a wider discussion
>>>>>>>>>>> than FileIO alone. I'm not entirely sure how wide, or if this has 
>>>>>>>>>>> been
>>>>>>>>>>> proposed before, but IMO it warrants a design doc or proposal.
>>>>>>>>>>> >
>>>>>>>>>>> > A couple high level questions I can think of are,
>>>>>>>>>>> >   - What runners support sharding?
>>>>>>>>>>> >       * There will be some work in Dataflow required to
>>>>>>>>>>> support this but I'm not sure how much.
>>>>>>>>>>> >   - What does sharding mean for streaming pipelines?
>>>>>>>>>>> >
>>>>>>>>>>> > A more nitty-detail question:
>>>>>>>>>>> >   - How can this be achieved performantly? For example, if a
>>>>>>>>>>> shuffle is required to achieve a particular sharding constraint, 
>>>>>>>>>>> should we
>>>>>>>>>>> allow transforms to declare they don't modify the sharding property 
>>>>>>>>>>> (e.g.
>>>>>>>>>>> key preserving) which may allow a runner to avoid an additional 
>>>>>>>>>>> shuffle if
>>>>>>>>>>> a preceding shuffle can guarantee the sharding requirements?
>>>>>>>>>>> >
>>>>>>>>>>> > Where X is the shuffle that could be avoided: input -> shuffle
>>>>>>>>>>> (key sharding fn A) -> transform1 (key preserving) -> transform 2 
>>>>>>>>>>> (key
>>>>>>>>>>> preserving) -> X -> fileio (key sharding fn A)
>>>>>>>>>>> >
>>>>>>>>>>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <
>>>>>>>>>>> jozo.vil...@gmail.com> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> > I would like to extend FileIO with possibility to specify a
>>>>>>>>>>> custom sharding function:
>>>>>>>>>>> > https://issues.apache.org/jira/browse/BEAM-12493
>>>>>>>>>>> >
>>>>>>>>>>> > I have 2 use-cases for this:
>>>>>>>>>>> >
>>>>>>>>>>> > I need to generate shards which are compatible with Hive
>>>>>>>>>>> bucketing and therefore need to decide shard assignment based on 
>>>>>>>>>>> data
>>>>>>>>>>> fields of input element
>>>>>>>>>>> > When running e.g. on Spark and job encounters kind of failure
>>>>>>>>>>> which cause a loss of some data from previous stages, Spark does 
>>>>>>>>>>> issue
>>>>>>>>>>> recompute of necessary task in necessary stages to recover data. 
>>>>>>>>>>> Because
>>>>>>>>>>> the shard assignment function is random as default, some data will 
>>>>>>>>>>> end up
>>>>>>>>>>> in different shards and cause duplicates in the final output.
>>>>>>>>>>> >
>>>>>>>>>>> > Please let me know your thoughts in case you see a reason to
>>>>>>>>>>> not to add such improvement.
>>>>>>>>>>> >
>>>>>>>>>>> > Thanks,
>>>>>>>>>>> > Jozef
>>>>>>>>>>> >
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to