MR for review for this change is here:
https://github.com/apache/beam/pull/15051

On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> I would like this thread to stay focused on sharding FileIO only. Possible
> change to the model is an interesting topic but of a much different scope.
>
> Yes, I agree that sharding is mostly a physical rather than logical
> property of the pipeline. That is why it feels more natural to distinguish
> between those two on the API level.
> As for handling sharding requirements by adding more sugar to dynamic
> destinations + file naming one has to keep in mind that results of dynamic
> writes can be observed in the form of KV<DestinationT, String>, so written
> files per dynamic destination. Often we do GBP to post-process files per
> destination / logical group. If sharding would be encoded there, then it
> might complicate things either downstream or inside the sugar part to put
> shard in and then take it out later.
> From the user perspective I do not see much difference. We would still
> need to allow API to define both behaviors and it would only be executed
> differently by implementation.
> I do not see a value in changing FileIO (WriteFiles) logic to stop using
> sharding and use dynamic destination for both given that sharding function
> is already there and in use.
>
> To the point of external shuffle and non-deterministic user input.
> Yes users can create non-deterministic behaviors but they are in control.
> Here, Beam internally adds non-deterministic behavior and users can not
> opt-out.
> All works fine as long as external shuffle service (depends on Runner)
> holds to the data and hands it out on retries. However if data in shuffle
> service is lost for some reason - e.g. disk failure, node breaks down -
> then pipeline have to recover the data by recomputing necessary paths.
>
> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> Sharding is typically a physical rather than logical property of the
>> pipeline, and I'm not convinced it makes sense to add it to Beam in
>> general. One can already use keys and GBK/Stateful DoFns if some kind
>> of logical grouping is needed, and adding constraints like this can
>> prevent opportunities for optimizations (like dynamic sharding and
>> fusion).
>>
>> That being said, file output are one area where it could make sense. I
>> would expect that dynamic destinations could cover this usecase, and a
>> general FileNaming subclass could be provided to make this pattern
>> easier (and possibly some syntactic sugar for auto-setting num shards
>> to 0). (One downside of this approach is that one couldn't do dynamic
>> destinations, and have each sharded with a distinct sharing function
>> as well.)
>>
>> If this doesn't work, we could look into adding ShardingFunction as a
>> publicly exposed parameter to FileIO. (I'm actually surprised it
>> already exists.)
>>
>> On Thu, Jun 17, 2021 at 9:39 AM <je...@seznam.cz> wrote:
>> >
>> > Alright, but what is worth emphasizing is that we talk about batch
>> workloads. The typical scenario is that the output is committed once the
>> job finishes (e.g., by atomic rename of directory).
>> >  Jan
>> >
>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax <re...@google.com>:
>> >
>> > Yes - the problem is that Beam makes no guarantees of determinism
>> anywhere in the system. User DoFns might be non deterministic, and have no
>> way to know (we've discussed proving users with an @IsDeterministic
>> annotation, however empirically users often think their functions are
>> deterministic when they are in fact not). _Any_ sort of triggered
>> aggregation (including watermark based) can always be non deterministic.
>> >
>> > Even if everything was deterministic, replaying everything is tricky.
>> The output files already exist - should the system delete them and recreate
>> them, or leave them as is and delete the temp files? Either decision could
>> be problematic.
>> >
>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský <je...@seznam.cz> wrote:
>> >
>> > Correct, by the external shuffle service I pretty much meant
>> "offloading the contents of a shuffle phase out of the system". Looks like
>> that is what the Spark's checkpoint does as well. On the other hand (if I
>> understand the concept correctly), that implies some performance penalty -
>> the data has to be moved to external distributed filesystem. Which then
>> feels weird if we optimize code to avoid computing random numbers, but are
>> okay with moving complete datasets back and forth. I think in this
>> particular case making the Pipeline deterministic - idempotent to be
>> precise - (within the limits, yes, hashCode of enum is not stable between
>> JVMs) would seem more practical to me.
>> >
>> >  Jan
>> >
>> > On 6/17/21 7:09 AM, Reuven Lax wrote:
>> >
>> > I have some thoughts here, as Eugene Kirpichov and I spent a lot of
>> time working through these semantics in the past.
>> >
>> > First - about the problem of duplicates:
>> >
>> > A "deterministic" sharding - e.g. hashCode based (though Java makes no
>> guarantee that hashCode is stable across JVM instances, so this technique
>> ends up not being stable) doesn't really help matters in Beam. Unlike other
>> systems, Beam makes no assumptions that transforms are idempotent or
>> deterministic. What's more, _any_ transform that has any sort of triggered
>> grouping (whether the trigger used is watermark based or otherwise) is non
>> deterministic.
>> >
>> > Forcing a hash of every element imposed quite a CPU cost; even
>> generating a random number per-element slowed things down too much, which
>> is why the current code generates a random number only in startBundle.
>> >
>> > Any runner that does not implement RequiresStableInput will not
>> properly execute FileIO. Dataflow and Flink both support this. I believe
>> that the Spark runner implicitly supports it by manually calling checkpoint
>> as Ken mentioned (unless someone removed that from the Spark runner, but if
>> so that would be a correctness regression). Implementing this has nothing
>> to do with external shuffle services - neither Flink, Spark, nor Dataflow
>> appliance (classic shuffle) have any problem correctly implementing
>> RequiresStableInput.
>> >
>> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský <je...@seznam.cz> wrote:
>> >
>> > I think that the support for @RequiresStableInput is rather limited.
>> AFAIK it is supported by streaming Flink (where it is not needed in this
>> situation) and by Dataflow. Batch runners without external shuffle service
>> that works as in the case of Dataflow have IMO no way to implement it
>> correctly. In the case of FileIO (which do not use @RequiresStableInput as
>> it would not be supported on Spark) the randomness is easily avoidable
>> (hashCode of key?) and I it seems to me it should be preferred.
>> >
>> >  Jan
>> >
>> > On 6/16/21 6:23 PM, Kenneth Knowles wrote:
>> >
>> >
>> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský <je...@seznam.cz> wrote:
>> >
>> > Hi,
>> >
>> > maybe a little unrelated, but I think we definitely should not use
>> random assignment of shard keys (RandomShardingFunction), at least for
>> bounded workloads (seems to be fine for streaming workloads). Many batch
>> runners simply recompute path in the computation DAG from the failed node
>> (transform) to the root (source). In the case there is any non-determinism
>> involved in the logic, then it can result in duplicates (as the 'previous'
>> attempt might have ended in DAG path that was not affected by the fail).
>> That addresses the option 2) of what Jozef have mentioned.
>> >
>> > This is the reason we introduced "@RequiresStableInput".
>> >
>> > When things were only Dataflow, we knew that each shuffle was a
>> checkpoint, so we inserted a Reshuffle after the random numbers were
>> generated, freezing them so it was safe for replay. Since other engines do
>> not checkpoint at every shuffle, we needed a way to communicate that this
>> checkpointing was required for correctness. I think we still have many IOs
>> that are written using Reshuffle instead of @RequiresStableInput, and I
>> don't know which runners process @RequiresStableInput properly.
>> >
>> > By the way, I believe the SparkRunner explicitly calls materialize()
>> after a GBK specifically so that it gets correct results for IOs that rely
>> on Reshuffle. Has that changed?
>> >
>> > I agree that we should minimize use of RequiresStableInput. It has a
>> significant cost, and the cost varies across runners. If we can use a
>> deterministic function, we should.
>> >
>> > Kenn
>> >
>> >
>> >  Jan
>> >
>> > On 6/16/21 1:43 PM, Jozef Vilcek wrote:
>> >
>> >
>> >
>> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles <k...@apache.org>
>> wrote:
>> >
>> > In general, Beam only deals with keys and grouping by key. I think
>> expanding this idea to some more abstract notion of a sharding function
>> could make sense.
>> >
>> > For FileIO specifically, I wonder if you can use writeDynamic() to get
>> the behavior you are seeking.
>> >
>> >
>> > The change in mind looks like this:
>> >
>> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
>> >
>> > Dynamic Destinations in my mind is more towards the need for
>> "partitioning" data (destination as directory level) or if one needs to
>> handle groups of events differently, e.g. write some events in FormatA and
>> others in FormatB.
>> > Shards are now used for distributing writes or bucketing of events
>> within a particular destination group. More specifically, currently, each
>> element is assigned `ShardedKey<Integer>` [1] before GBK operation. Sharded
>> key is a compound of destination and assigned shard.
>> >
>> > Having said that, I might be able to use dynamic destination for this,
>> possibly with the need of custom FileNaming, and set shards to be always 1.
>> But it feels less natural than allowing the user to swap already present
>> `RandomShardingFunction` [2] with something of his own choosing.
>> >
>> >
>> > [1]
>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
>> > [2]
>> https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856
>> >
>> > Kenn
>> >
>> > On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton <tyso...@google.com>
>> wrote:
>> >
>> > Adding sharding to the model may require a wider discussion than FileIO
>> alone. I'm not entirely sure how wide, or if this has been proposed before,
>> but IMO it warrants a design doc or proposal.
>> >
>> > A couple high level questions I can think of are,
>> >   - What runners support sharding?
>> >       * There will be some work in Dataflow required to support this
>> but I'm not sure how much.
>> >   - What does sharding mean for streaming pipelines?
>> >
>> > A more nitty-detail question:
>> >   - How can this be achieved performantly? For example, if a shuffle is
>> required to achieve a particular sharding constraint, should we allow
>> transforms to declare they don't modify the sharding property (e.g. key
>> preserving) which may allow a runner to avoid an additional shuffle if a
>> preceding shuffle can guarantee the sharding requirements?
>> >
>> > Where X is the shuffle that could be avoided: input -> shuffle (key
>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
>> preserving) -> X -> fileio (key sharding fn A)
>> >
>> > On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek <jozo.vil...@gmail.com>
>> wrote:
>> >
>> > I would like to extend FileIO with possibility to specify a custom
>> sharding function:
>> > https://issues.apache.org/jira/browse/BEAM-12493
>> >
>> > I have 2 use-cases for this:
>> >
>> > I need to generate shards which are compatible with Hive bucketing and
>> therefore need to decide shard assignment based on data fields of input
>> element
>> > When running e.g. on Spark and job encounters kind of failure which
>> cause a loss of some data from previous stages, Spark does issue recompute
>> of necessary task in necessary stages to recover data. Because the shard
>> assignment function is random as default, some data will end up in
>> different shards and cause duplicates in the final output.
>> >
>> > Please let me know your thoughts in case you see a reason to not to add
>> such improvement.
>> >
>> > Thanks,
>> > Jozef
>> >
>> >
>>
>

Reply via email to