On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
>
>> I don't think this has anything to do with external shuffle services.
>>
>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>> since Beam does not restrict transforms to being deterministic. The Spark
>> runner works (at least it did last I checked) by checkpointing the RDD.
>> Spark will not recompute the DAG past a checkpoint, so this creates stable
>> input to a transform. This adds some cost to the Spark runner, but makes
>> things correct. You should not have sharding problems due to replay unless
>> there is a bug in the current Spark runner.
>>
>> Beam does not restrict non-determinism, true. If users do add it, then
>> they can work around it should they need to address any side effects. But
>> should non-determinism be deliberately added by Beam core? Probably can if
>> runners can 100% deal with that effectively. To the point of RDD
>> `checkpoint`, afaik SparkRunner does use `cache`, not checkpoint. Also, I
>> thought cache is invoked on fork points in DAG. I have just a join and map
>> and in such cases I thought data is always served out by shuffle service.
>> Am I mistaken?
>>
>
> Non determinism is already there  in core Beam features. If any sort of
> trigger is used anywhere in the pipeline, the result is non deterministic.
> We've also found that users tend not to know when their ParDos are non
> deterministic, so telling users to works around it tends not to work.
>
> The Spark runner definitely used to use checkpoint in this case.
>

It is beyond my knowledge level of Beam how triggers introduce
non-determinism into the code processing in batch mode. So I leave that
out. Reuven, do you mind pointing me to the Spark runner code which is
supposed to handle this by using checkpoint? I can not find it myself.



>
>>
>> On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> How will @RequiresStableInput prevent this situation when running batch
>>>>> use case?
>>>>>
>>>>
>>>> So this is handled in combination of @RequiresStableInput and output
>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners)
>>>> makes sure that the input provided for the write stage does not get
>>>> recomputed in the presence of failures. Output finalization makes sure that
>>>> we only finalize one run of each bundle and discard the rest.
>>>>
>>>> So this I believe relies on having robust external service to hold
>>>> shuffle data and serve it out when needed so pipeline does not need to
>>>> recompute it via non-deterministic function. In Spark however, shuffle
>>>> service can not be (if I am not mistaking) deployed in this fashion (HA +
>>>> replicated shuffle data). Therefore, if instance of shuffle service holding
>>>> a portion of shuffle data fails, spark recovers it by recomputing parts of
>>>> a DAG from source to recover lost shuffle results. I am not sure what Beam
>>>> can do here to prevent it or make it stable? Will @RequiresStableInput work
>>>> as expected? ... Note that this, I believe, concerns only the batch.
>>>>
>>>
>>> I don't think this has anything to do with external shuffle services.
>>>
>>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>>> since Beam does not restrict transforms to being deterministic. The Spark
>>> runner works (at least it did last I checked) by checkpointing the RDD.
>>> Spark will not recompute the DAG past a checkpoint, so this creates stable
>>> input to a transform. This adds some cost to the Spark runner, but makes
>>> things correct. You should not have sharding problems due to replay unless
>>> there is a bug in the current Spark runner.
>>>
>>>
>>>>
>>>> Also, when withNumShards() truly have to be used, round robin
>>>> assignment of elements to shards sounds like the optimal solution (at least
>>>> for the vast majority of pipelines)
>>>>
>>>> I agree. But random seed is my problem here with respect to the
>>>> situation mentioned above.
>>>>
>>>> Right, I would like to know if there are more true use-cases before
>>>> adding this. This essentially allows users to map elements to exact output
>>>> shards which could change the characteristics of pipelines in very
>>>> significant ways without users being aware of it. For example, this could
>>>> result in extremely imbalanced workloads for any downstream processors. If
>>>> it's just Hive I would rather work around it (even with a perf penalty for
>>>> that case).
>>>>
>>>> User would have to explicitly ask FileIO to use specific sharding.
>>>> Documentation can educate about the tradeoffs. But I am open to workaround
>>>> alternatives.
>>>>
>>>>
>>>> On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Cham, thanks for the feedback
>>>>>>
>>>>>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>>>> runners to optimize better. I think one concern might be that the 
>>>>>> addition
>>>>>> of this option might be going against this.
>>>>>>
>>>>>> I agree that less knobs is more. But if assignment of a key is
>>>>>> specific per user request via API (optional), than runner should not
>>>>>> optimize that but need to respect how data is requested to be laid down. 
>>>>>> It
>>>>>> could however optimize number of shards as it is doing right now if
>>>>>> numShards is not set explicitly.
>>>>>> Anyway FileIO feels fluid here. You can leave shards empty and let
>>>>>> runner decide or optimize,  but only in batch and not in streaming.  Then
>>>>>> it allows you to set number of shards but never let you change the logic
>>>>>> how it is assigned and defaults to round robin with random seed. It begs
>>>>>> question for me why I can manipulate number of shards but not the logic 
>>>>>> of
>>>>>> assignment. Are there plans to (or already case implemented) optimize
>>>>>> around and have runner changing that under different circumstances?
>>>>>>
>>>>>
>>>>> I don't know the exact history behind withNumShards() feature for
>>>>> batch. I would guess it was originally introduced due to limitations for
>>>>> streaming but was made available to batch as well with a strong 
>>>>> performance
>>>>> warning.
>>>>>
>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L159
>>>>>
>>>>> Also, when withNumShards() truly have to be used, round robin
>>>>> assignment of elements to shards sounds like the optimal solution (at 
>>>>> least
>>>>> for the vast majority of pipelines)
>>>>>
>>>>>
>>>>>>
>>>>>> > Also looking at your original reasoning for adding this.
>>>>>>
>>>>>> > > I need to generate shards which are compatible with Hive
>>>>>> bucketing and therefore need to decide shard assignment based on data
>>>>>> fields of input element
>>>>>>
>>>>>> > This sounds very specific to Hive. Again I think the downside here
>>>>>> is that we will be giving up the flexibility for the runner to optimize 
>>>>>> the
>>>>>> pipeline and decide sharding. Do you think it's possible to somehow relax
>>>>>> this requirement for > > > Hive or use a secondary process to format the
>>>>>> output to a form that is suitable for Hive after being written to an
>>>>>> intermediate storage.
>>>>>>
>>>>>> It is possible. But it is quite suboptimal because of extra IO
>>>>>> penalty and I would have to use completely custom file writing as FileIO
>>>>>> would not support this. Then naturally the question would be, should I 
>>>>>> use
>>>>>> FileIO at all? I am quite not sure if I am doing something so rare that 
>>>>>> it
>>>>>> is not and can not be supported. Maybe I am. I remember an old discussion
>>>>>> from Scio guys when they wanted to introduce Sorted Merge Buckets to 
>>>>>> FileIO
>>>>>> where sharding was also needed to be manipulated (among other things)
>>>>>>
>>>>>
>>>>> Right, I would like to know if there are more true use-cases before
>>>>> adding this. This essentially allows users to map elements to exact output
>>>>> shards which could change the characteristics of pipelines in very
>>>>> significant ways without users being aware of it. For example, this could
>>>>> result in extremely imbalanced workloads for any downstream processors. If
>>>>> it's just Hive I would rather work around it (even with a perf penalty for
>>>>> that case).
>>>>>
>>>>>
>>>>>>
>>>>>> > > When running e.g. on Spark and job encounters kind of failure
>>>>>> which cause a loss of some data from previous stages, Spark does issue
>>>>>> recompute of necessary task in necessary stages to recover data. Because
>>>>>> the shard assignment function is random as default, some data will end up
>>>>>> in different shards and cause duplicates in the final output.
>>>>>>
>>>>>> > I think this was already addressed. The correct solution here is to
>>>>>> implement RequiresStableInput for runners that do not already support 
>>>>>> that
>>>>>> and update FileIO to use that.
>>>>>>
>>>>>> How will @RequiresStableInput prevent this situation when running
>>>>>> batch use case?
>>>>>>
>>>>>
>>>>> So this is handled in combination of @RequiresStableInput and output
>>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners)
>>>>> makes sure that the input provided for the write stage does not get
>>>>> recomputed in the presence of failures. Output finalization makes sure 
>>>>> that
>>>>> we only finalize one run of each bundle and discard the rest.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath <
>>>>>> chamik...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> how do we proceed with reviewing MR proposed for this change?
>>>>>>>>
>>>>>>>> I sense there is a concern exposing existing sharding function to
>>>>>>>> the API. But from the discussion here I do not have a clear picture
>>>>>>>> about arguments not doing so.
>>>>>>>> Only one argument was that dynamic destinations should be able to
>>>>>>>> do the same. While this is true, as it is illustrated in previous
>>>>>>>> commnet, it is not simple nor convenient to use and requires more
>>>>>>>> customization than exposing sharding which is already there.
>>>>>>>> Are there more negatives to exposing sharding function?
>>>>>>>>
>>>>>>>
>>>>>>> Seems like currently FlinkStreamingPipelineTranslator is the only
>>>>>>> real usage of WriteFiles.withShardingFunction() :
>>>>>>> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L281
>>>>>>> WriteFiles is not expected to be used by pipeline authors but
>>>>>>> exposing this through the FileIO will make this available to pipeline
>>>>>>> authors (and some may choose to do so for various reasons).
>>>>>>> This will result in sharding being strict and runners being
>>>>>>> inflexible when it comes to optimizing execution of pipelines that use
>>>>>>> FileIO.
>>>>>>>
>>>>>>> Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>>>>> runners to optimize better. I think one concern might be that the 
>>>>>>> addition
>>>>>>> of this option might be going against this.
>>>>>>>
>>>>>>> Also looking at your original reasoning for adding this.
>>>>>>>
>>>>>>> > I need to generate shards which are compatible with Hive bucketing
>>>>>>> and therefore need to decide shard assignment based on data fields of 
>>>>>>> input
>>>>>>> element
>>>>>>>
>>>>>>> This sounds very specific to Hive. Again I think the downside here
>>>>>>> is that we will be giving up the flexibility for the runner to optimize 
>>>>>>> the
>>>>>>> pipeline and decide sharding. Do you think it's possible to somehow 
>>>>>>> relax
>>>>>>> this requirement for Hive or use a secondary process to format the 
>>>>>>> output
>>>>>>> to a form that is suitable for Hive after being written to an 
>>>>>>> intermediate
>>>>>>> storage.
>>>>>>>
>>>>>>> > When running e.g. on Spark and job encounters kind of failure
>>>>>>> which cause a loss of some data from previous stages, Spark does issue
>>>>>>> recompute of necessary task in necessary stages to recover data. Because
>>>>>>> the shard assignment function is random as default, some data will end 
>>>>>>> up
>>>>>>> in different shards and cause duplicates in the final output.
>>>>>>>
>>>>>>> I think this was already addressed. The correct solution here is to
>>>>>>> implement RequiresStableInput for runners that do not already support 
>>>>>>> that
>>>>>>> and update FileIO to use that.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> The difference in my opinion is in distinguishing between - as
>>>>>>>>> written in this thread - physical vs logical properties of the 
>>>>>>>>> pipeline. I
>>>>>>>>> proposed to keep dynamic destination (logical) and sharding (physical)
>>>>>>>>> separate on API level as it is at implementation level.
>>>>>>>>>
>>>>>>>>> When I reason about using `by()` for my case ... I am using
>>>>>>>>> dynamic destination to partition data into hourly folders. So my
>>>>>>>>> destination is e.g. `2021-06-23-07`. To add a shard there I assume I 
>>>>>>>>> will
>>>>>>>>> need
>>>>>>>>> * encode shard to the destination .. e.g. in form of file prefix
>>>>>>>>> `2021-06-23-07/shard-1`
>>>>>>>>> * dynamic destination now does not span a group of files but must
>>>>>>>>> be exactly one file
>>>>>>>>> * to respect above, I have to. "disable" sharding in WriteFiles
>>>>>>>>> and make sure to use `.withNumShards(1)` ... I am not sure what 
>>>>>>>>> `runnner
>>>>>>>>> determined` sharding would do, if it would not split destination 
>>>>>>>>> further
>>>>>>>>> into more files
>>>>>>>>> * make sure that FileNameing will respect my intent and name files
>>>>>>>>> as I expect based on that destination
>>>>>>>>> * post process `WriteFilesResult` to turn my destination which
>>>>>>>>> targets physical single file (date-with-hour + shard-num) back into 
>>>>>>>>> the
>>>>>>>>> destination with targets logical group of files (date-with-hour) so I 
>>>>>>>>> hook
>>>>>>>>> it up do downstream post-process as usual
>>>>>>>>>
>>>>>>>>> Am I roughly correct? Or do I miss something more straight forward?
>>>>>>>>> If the above is correct then it feels more fragile and less
>>>>>>>>> intuitive to me than the option in my MR.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I'm not sure I understand your PR. How is this PR different than
>>>>>>>>>> the by() method in FileIO?
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek <
>>>>>>>>>> jozo.vil...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> MR for review for this change is here:
>>>>>>>>>>> https://github.com/apache/beam/pull/15051
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek <
>>>>>>>>>>> jozo.vil...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I would like this thread to stay focused on sharding FileIO
>>>>>>>>>>>> only. Possible change to the model is an interesting topic but of 
>>>>>>>>>>>> a much
>>>>>>>>>>>> different scope.
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, I agree that sharding is mostly a physical rather than
>>>>>>>>>>>> logical property of the pipeline. That is why it feels more 
>>>>>>>>>>>> natural to
>>>>>>>>>>>> distinguish between those two on the API level.
>>>>>>>>>>>> As for handling sharding requirements by adding more sugar to
>>>>>>>>>>>> dynamic destinations + file naming one has to keep in mind that 
>>>>>>>>>>>> results of
>>>>>>>>>>>> dynamic writes can be observed in the form of KV<DestinationT, 
>>>>>>>>>>>> String>,
>>>>>>>>>>>> so written files per dynamic destination. Often we do GBP to 
>>>>>>>>>>>> post-process
>>>>>>>>>>>> files per destination / logical group. If sharding would be 
>>>>>>>>>>>> encoded there,
>>>>>>>>>>>> then it might complicate things either downstream or inside the 
>>>>>>>>>>>> sugar part
>>>>>>>>>>>> to put shard in and then take it out later.
>>>>>>>>>>>> From the user perspective I do not see much difference. We
>>>>>>>>>>>> would still need to allow API to define both behaviors and it 
>>>>>>>>>>>> would only be
>>>>>>>>>>>> executed differently by implementation.
>>>>>>>>>>>> I do not see a value in changing FileIO (WriteFiles) logic to
>>>>>>>>>>>> stop using sharding and use dynamic destination for both given that
>>>>>>>>>>>> sharding function is already there and in use.
>>>>>>>>>>>>
>>>>>>>>>>>> To the point of external shuffle and non-deterministic user
>>>>>>>>>>>> input.
>>>>>>>>>>>> Yes users can create non-deterministic behaviors but they are
>>>>>>>>>>>> in control. Here, Beam internally adds non-deterministic behavior 
>>>>>>>>>>>> and users
>>>>>>>>>>>> can not opt-out.
>>>>>>>>>>>> All works fine as long as external shuffle service (depends on
>>>>>>>>>>>> Runner) holds to the data and hands it out on retries. However if 
>>>>>>>>>>>> data in
>>>>>>>>>>>> shuffle service is lost for some reason - e.g. disk failure, node 
>>>>>>>>>>>> breaks
>>>>>>>>>>>> down - then pipeline have to recover the data by recomputing 
>>>>>>>>>>>> necessary
>>>>>>>>>>>> paths.
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <
>>>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Sharding is typically a physical rather than logical property
>>>>>>>>>>>>> of the
>>>>>>>>>>>>> pipeline, and I'm not convinced it makes sense to add it to
>>>>>>>>>>>>> Beam in
>>>>>>>>>>>>> general. One can already use keys and GBK/Stateful DoFns if
>>>>>>>>>>>>> some kind
>>>>>>>>>>>>> of logical grouping is needed, and adding constraints like
>>>>>>>>>>>>> this can
>>>>>>>>>>>>> prevent opportunities for optimizations (like dynamic sharding
>>>>>>>>>>>>> and
>>>>>>>>>>>>> fusion).
>>>>>>>>>>>>>
>>>>>>>>>>>>> That being said, file output are one area where it could make
>>>>>>>>>>>>> sense. I
>>>>>>>>>>>>> would expect that dynamic destinations could cover this
>>>>>>>>>>>>> usecase, and a
>>>>>>>>>>>>> general FileNaming subclass could be provided to make this
>>>>>>>>>>>>> pattern
>>>>>>>>>>>>> easier (and possibly some syntactic sugar for auto-setting num
>>>>>>>>>>>>> shards
>>>>>>>>>>>>> to 0). (One downside of this approach is that one couldn't do
>>>>>>>>>>>>> dynamic
>>>>>>>>>>>>> destinations, and have each sharded with a distinct sharing
>>>>>>>>>>>>> function
>>>>>>>>>>>>> as well.)
>>>>>>>>>>>>>
>>>>>>>>>>>>> If this doesn't work, we could look into adding
>>>>>>>>>>>>> ShardingFunction as a
>>>>>>>>>>>>> publicly exposed parameter to FileIO. (I'm actually surprised
>>>>>>>>>>>>> it
>>>>>>>>>>>>> already exists.)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Jun 17, 2021 at 9:39 AM <je...@seznam.cz> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Alright, but what is worth emphasizing is that we talk about
>>>>>>>>>>>>> batch workloads. The typical scenario is that the output is 
>>>>>>>>>>>>> committed once
>>>>>>>>>>>>> the job finishes (e.g., by atomic rename of directory).
>>>>>>>>>>>>> >  Jan
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <
>>>>>>>>>>>>> re...@google.com>:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Yes - the problem is that Beam makes no guarantees of
>>>>>>>>>>>>> determinism anywhere in the system. User DoFns might be non 
>>>>>>>>>>>>> deterministic,
>>>>>>>>>>>>> and have no way to know (we've discussed proving users with an
>>>>>>>>>>>>> @IsDeterministic annotation, however empirically users often 
>>>>>>>>>>>>> think their
>>>>>>>>>>>>> functions are deterministic when they are in fact not). _Any_ 
>>>>>>>>>>>>> sort of
>>>>>>>>>>>>> triggered aggregation (including watermark based) can always be 
>>>>>>>>>>>>> non
>>>>>>>>>>>>> deterministic.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Even if everything was deterministic, replaying everything
>>>>>>>>>>>>> is tricky. The output files already exist - should the system 
>>>>>>>>>>>>> delete them
>>>>>>>>>>>>> and recreate them, or leave them as is and delete the temp files? 
>>>>>>>>>>>>> Either
>>>>>>>>>>>>> decision could be problematic.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <
>>>>>>>>>>>>> je...@seznam.cz> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Correct, by the external shuffle service I pretty much meant
>>>>>>>>>>>>> "offloading the contents of a shuffle phase out of the system". 
>>>>>>>>>>>>> Looks like
>>>>>>>>>>>>> that is what the Spark's checkpoint does as well. On the other 
>>>>>>>>>>>>> hand (if I
>>>>>>>>>>>>> understand the concept correctly), that implies some performance 
>>>>>>>>>>>>> penalty -
>>>>>>>>>>>>> the data has to be moved to external distributed filesystem. 
>>>>>>>>>>>>> Which then
>>>>>>>>>>>>> feels weird if we optimize code to avoid computing random 
>>>>>>>>>>>>> numbers, but are
>>>>>>>>>>>>> okay with moving complete datasets back and forth. I think in this
>>>>>>>>>>>>> particular case making the Pipeline deterministic - idempotent to 
>>>>>>>>>>>>> be
>>>>>>>>>>>>> precise - (within the limits, yes, hashCode of enum is not stable 
>>>>>>>>>>>>> between
>>>>>>>>>>>>> JVMs) would seem more practical to me.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >  Jan
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On 6/17/21 7:09 AM, Reuven Lax wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I have some thoughts here, as Eugene Kirpichov and I spent a
>>>>>>>>>>>>> lot of time working through these semantics in the past.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > First - about the problem of duplicates:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > A "deterministic" sharding - e.g. hashCode based (though
>>>>>>>>>>>>> Java makes no guarantee that hashCode is stable across JVM 
>>>>>>>>>>>>> instances, so
>>>>>>>>>>>>> this technique ends up not being stable) doesn't really help 
>>>>>>>>>>>>> matters in
>>>>>>>>>>>>> Beam. Unlike other systems, Beam makes no assumptions that 
>>>>>>>>>>>>> transforms are
>>>>>>>>>>>>> idempotent or deterministic. What's more, _any_ transform that 
>>>>>>>>>>>>> has any sort
>>>>>>>>>>>>> of triggered grouping (whether the trigger used is watermark 
>>>>>>>>>>>>> based or
>>>>>>>>>>>>> otherwise) is non deterministic.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Forcing a hash of every element imposed quite a CPU cost;
>>>>>>>>>>>>> even generating a random number per-element slowed things down 
>>>>>>>>>>>>> too much,
>>>>>>>>>>>>> which is why the current code generates a random number only in 
>>>>>>>>>>>>> startBundle.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Any runner that does not implement RequiresStableInput will
>>>>>>>>>>>>> not properly execute FileIO. Dataflow and Flink both support 
>>>>>>>>>>>>> this. I
>>>>>>>>>>>>> believe that the Spark runner implicitly supports it by manually 
>>>>>>>>>>>>> calling
>>>>>>>>>>>>> checkpoint as Ken mentioned (unless someone removed that from the 
>>>>>>>>>>>>> Spark
>>>>>>>>>>>>> runner, but if so that would be a correctness regression). 
>>>>>>>>>>>>> Implementing
>>>>>>>>>>>>> this has nothing to do with external shuffle services - neither 
>>>>>>>>>>>>> Flink,
>>>>>>>>>>>>> Spark, nor Dataflow appliance (classic shuffle) have any problem 
>>>>>>>>>>>>> correctly
>>>>>>>>>>>>> implementing RequiresStableInput.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <
>>>>>>>>>>>>> je...@seznam.cz> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I think that the support for @RequiresStableInput is rather
>>>>>>>>>>>>> limited. AFAIK it is supported by streaming Flink (where it is 
>>>>>>>>>>>>> not needed
>>>>>>>>>>>>> in this situation) and by Dataflow. Batch runners without 
>>>>>>>>>>>>> external shuffle
>>>>>>>>>>>>> service that works as in the case of Dataflow have IMO no way to 
>>>>>>>>>>>>> implement
>>>>>>>>>>>>> it correctly. In the case of FileIO (which do not use 
>>>>>>>>>>>>> @RequiresStableInput
>>>>>>>>>>>>> as it would not be supported on Spark) the randomness is easily 
>>>>>>>>>>>>> avoidable
>>>>>>>>>>>>> (hashCode of key?) and I it seems to me it should be preferred.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >  Jan
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <
>>>>>>>>>>>>> je...@seznam.cz> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Hi,
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > maybe a little unrelated, but I think we definitely should
>>>>>>>>>>>>> not use random assignment of shard keys (RandomShardingFunction), 
>>>>>>>>>>>>> at least
>>>>>>>>>>>>> for bounded workloads (seems to be fine for streaming workloads). 
>>>>>>>>>>>>> Many
>>>>>>>>>>>>> batch runners simply recompute path in the computation DAG from 
>>>>>>>>>>>>> the failed
>>>>>>>>>>>>> node (transform) to the root (source). In the case there is any
>>>>>>>>>>>>> non-determinism involved in the logic, then it can result in 
>>>>>>>>>>>>> duplicates (as
>>>>>>>>>>>>> the 'previous' attempt might have ended in DAG path that was not 
>>>>>>>>>>>>> affected
>>>>>>>>>>>>> by the fail). That addresses the option 2) of what Jozef have 
>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > This is the reason we introduced "@RequiresStableInput".
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > When things were only Dataflow, we knew that each shuffle
>>>>>>>>>>>>> was a checkpoint, so we inserted a Reshuffle after the random 
>>>>>>>>>>>>> numbers were
>>>>>>>>>>>>> generated, freezing them so it was safe for replay. Since other 
>>>>>>>>>>>>> engines do
>>>>>>>>>>>>> not checkpoint at every shuffle, we needed a way to communicate 
>>>>>>>>>>>>> that this
>>>>>>>>>>>>> checkpointing was required for correctness. I think we still have 
>>>>>>>>>>>>> many IOs
>>>>>>>>>>>>> that are written using Reshuffle instead of @RequiresStableInput, 
>>>>>>>>>>>>> and I
>>>>>>>>>>>>> don't know which runners process @RequiresStableInput properly.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > By the way, I believe the SparkRunner explicitly calls
>>>>>>>>>>>>> materialize() after a GBK specifically so that it gets correct 
>>>>>>>>>>>>> results for
>>>>>>>>>>>>> IOs that rely on Reshuffle. Has that changed?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I agree that we should minimize use of RequiresStableInput.
>>>>>>>>>>>>> It has a significant cost, and the cost varies across runners. If 
>>>>>>>>>>>>> we can
>>>>>>>>>>>>> use a deterministic function, we should.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Kenn
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >  Jan
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <
>>>>>>>>>>>>> k...@apache.org> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > In general, Beam only deals with keys and grouping by key. I
>>>>>>>>>>>>> think expanding this idea to some more abstract notion of a 
>>>>>>>>>>>>> sharding
>>>>>>>>>>>>> function could make sense.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > For FileIO specifically, I wonder if you can use
>>>>>>>>>>>>> writeDynamic() to get the behavior you are seeking.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > The change in mind looks like this:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Dynamic Destinations in my mind is more towards the need for
>>>>>>>>>>>>> "partitioning" data (destination as directory level) or if one 
>>>>>>>>>>>>> needs to
>>>>>>>>>>>>> handle groups of events differently, e.g. write some events in 
>>>>>>>>>>>>> FormatA and
>>>>>>>>>>>>> others in FormatB.
>>>>>>>>>>>>> > Shards are now used for distributing writes or bucketing of
>>>>>>>>>>>>> events within a particular destination group. More specifically, 
>>>>>>>>>>>>> currently,
>>>>>>>>>>>>> each element is assigned `ShardedKey<Integer>` [1] before GBK 
>>>>>>>>>>>>> operation.
>>>>>>>>>>>>> Sharded key is a compound of destination and assigned shard.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Having said that, I might be able to use dynamic destination
>>>>>>>>>>>>> for this, possibly with the need of custom FileNaming, and set 
>>>>>>>>>>>>> shards to be
>>>>>>>>>>>>> always 1. But it feels less natural than allowing the user to 
>>>>>>>>>>>>> swap already
>>>>>>>>>>>>> present `RandomShardingFunction` [2] with something of his own 
>>>>>>>>>>>>> choosing.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > [1]
>>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
>>>>>>>>>>>>> > [2]
>>>>>>>>>>>>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Kenn
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <
>>>>>>>>>>>>> tyso...@google.com> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Adding sharding to the model may require a wider discussion
>>>>>>>>>>>>> than FileIO alone. I'm not entirely sure how wide, or if this has 
>>>>>>>>>>>>> been
>>>>>>>>>>>>> proposed before, but IMO it warrants a design doc or proposal.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > A couple high level questions I can think of are,
>>>>>>>>>>>>> >   - What runners support sharding?
>>>>>>>>>>>>> >       * There will be some work in Dataflow required to
>>>>>>>>>>>>> support this but I'm not sure how much.
>>>>>>>>>>>>> >   - What does sharding mean for streaming pipelines?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > A more nitty-detail question:
>>>>>>>>>>>>> >   - How can this be achieved performantly? For example, if a
>>>>>>>>>>>>> shuffle is required to achieve a particular sharding constraint, 
>>>>>>>>>>>>> should we
>>>>>>>>>>>>> allow transforms to declare they don't modify the sharding 
>>>>>>>>>>>>> property (e.g.
>>>>>>>>>>>>> key preserving) which may allow a runner to avoid an additional 
>>>>>>>>>>>>> shuffle if
>>>>>>>>>>>>> a preceding shuffle can guarantee the sharding requirements?
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Where X is the shuffle that could be avoided: input ->
>>>>>>>>>>>>> shuffle (key sharding fn A) -> transform1 (key preserving) -> 
>>>>>>>>>>>>> transform 2
>>>>>>>>>>>>> (key preserving) -> X -> fileio (key sharding fn A)
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <
>>>>>>>>>>>>> jozo.vil...@gmail.com> wrote:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I would like to extend FileIO with possibility to specify a
>>>>>>>>>>>>> custom sharding function:
>>>>>>>>>>>>> > https://issues.apache.org/jira/browse/BEAM-12493
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I have 2 use-cases for this:
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > I need to generate shards which are compatible with Hive
>>>>>>>>>>>>> bucketing and therefore need to decide shard assignment based on 
>>>>>>>>>>>>> data
>>>>>>>>>>>>> fields of input element
>>>>>>>>>>>>> > When running e.g. on Spark and job encounters kind of
>>>>>>>>>>>>> failure which cause a loss of some data from previous stages, 
>>>>>>>>>>>>> Spark does
>>>>>>>>>>>>> issue recompute of necessary task in necessary stages to recover 
>>>>>>>>>>>>> data.
>>>>>>>>>>>>> Because the shard assignment function is random as default, some 
>>>>>>>>>>>>> data will
>>>>>>>>>>>>> end up in different shards and cause duplicates in the final 
>>>>>>>>>>>>> output.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Please let me know your thoughts in case you see a reason to
>>>>>>>>>>>>> not to add such improvement.
>>>>>>>>>>>>> >
>>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>>> > Jozef
>>>>>>>>>>>>> >
>>>>>>>>>>>>> >
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to