[ 
https://issues.apache.org/jira/browse/FLINK-21936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-21936:
-----------------------------------
    Description: 
We currently do not have any hard guarantees on pointwise connection regarding 
data consistency. However, since data was structured implicitly in the same way 
as any preceding source or keyby, some users relied on this behavior to divide 
compute-intensive tasks into smaller chunks while relying on ordering 
guarantees.

As long as the parallelism does not change, unaligned checkpoints (UC) retains 
these properties. With the implementation of rescaling of UC (FLINK-19801), 
that has changed. For most exchanges, there is a meaningful way to reassign 
state from one channel to another (even in random order). For some exchanges, 
the mapping is ambiguous and requires post-filtering. However, for point-wise 
connections, it's impossible while retaining these properties.

Consider, {{source -> keyby -> task1 -> forward -> task2}}. Now if we want to 
rescale from parallelism p = 1 to p = 2, suddenly the records inside the keyby 
channels need to be divided into two channels according to the keygroups. That 
is easily possible by using the keygroup ranges of the operators and a way to 
determine the key(group) of the record (independent of the actual approach). 
For the forward channel, we completely lack the key context. No record in the 
forward channel has any keygroup assigned; it's also not possible to calculate 
it as there is no guarantee that the key is still present.

The root cause for this limitation is the conceptual mismatch between what we 
provide and what some users assume we provide (or we assume that the users 
assume). For example, it's impossible to use (keyed) state in task2 right now, 
because there is no key context, but we still want to guarantee orderness in 
respect to that key context.

For 1.13, the easiest solution is to disable channel state in pointwise 
connections. For any non-trivial application with at least one shuffle, the 
number of pointwise channels (linear to p) is quickly dwarfed by all-to-all 
connections (quadratic to p). I'd add some alternative ideas to the discussion.

  was:
We currently do not have any hard guarantees on pointwise connection regarding 
data consistency. However, since data was structured implicitly in the same way 
as any preceding source or keyby, some users relied on this behavior to divide 
compute-intensive tasks into smaller chunks while relying on ordering 
guarantees.

As long as the parallelism does not change, unaligned checkpoints (UC) retains 
these properties. With the implementation of rescaling of UC (FLINK-19801), 
that has changed. For most exchanges, there is a meaningful way to reassign 
state from one channel to another (even in random order). For some exchanges, 
the mapping is ambiguous and requires post-filtering. However, for point-wise 
connections, it's impossible while retaining these properties.

Consider, {{source -> keyby -> task1 -> forward -> task2}}. No if we want to 
rescale from parallelism p = 1 to p = 2, suddenly the records inside the keyby 
channels need to be divided into two channels according to the keygroups. That 
is easily possible by using the keygroup ranges of the operators and a way to 
determine the key(group) of the record (independent of the actual approach). 
For the forward channel, we completely lack the key context. No record in the 
forward channel has any keygroup assigned; it's also not possible to calculate 
it as there is no guarantee that the key is still present.

The root cause for this limitation is the conceptual mismatch between what we 
provide and what some users assume we provide (or we assume that the users 
assume). For example, it's impossible to use (keyed) state in task2 right now, 
because there is no key context, but we still want to guarantee orderness in 
respect to that key context.

For 1.13, the easiest solution is to disable channel state in pointwise 
connections. For any non-trivial application with at least one shuffle, the 
number of pointwise channels (linear to p) is quickly dwarfed by all-to-all 
connections (quadratic to p). I'd add some alternative ideas to the discussion.


> Disable checkpointing of inflight data in pointwise connections for unaligned 
> checkpoints
> -----------------------------------------------------------------------------------------
>
>                 Key: FLINK-21936
>                 URL: https://issues.apache.org/jira/browse/FLINK-21936
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.13.0
>            Reporter: Arvid Heise
>            Assignee: Arvid Heise
>            Priority: Major
>
> We currently do not have any hard guarantees on pointwise connection 
> regarding data consistency. However, since data was structured implicitly in 
> the same way as any preceding source or keyby, some users relied on this 
> behavior to divide compute-intensive tasks into smaller chunks while relying 
> on ordering guarantees.
> As long as the parallelism does not change, unaligned checkpoints (UC) 
> retains these properties. With the implementation of rescaling of UC 
> (FLINK-19801), that has changed. For most exchanges, there is a meaningful 
> way to reassign state from one channel to another (even in random order). For 
> some exchanges, the mapping is ambiguous and requires post-filtering. 
> However, for point-wise connections, it's impossible while retaining these 
> properties.
> Consider, {{source -> keyby -> task1 -> forward -> task2}}. Now if we want to 
> rescale from parallelism p = 1 to p = 2, suddenly the records inside the 
> keyby channels need to be divided into two channels according to the 
> keygroups. That is easily possible by using the keygroup ranges of the 
> operators and a way to determine the key(group) of the record (independent of 
> the actual approach). For the forward channel, we completely lack the key 
> context. No record in the forward channel has any keygroup assigned; it's 
> also not possible to calculate it as there is no guarantee that the key is 
> still present.
> The root cause for this limitation is the conceptual mismatch between what we 
> provide and what some users assume we provide (or we assume that the users 
> assume). For example, it's impossible to use (keyed) state in task2 right 
> now, because there is no key context, but we still want to guarantee 
> orderness in respect to that key context.
> For 1.13, the easiest solution is to disable channel state in pointwise 
> connections. For any non-trivial application with at least one shuffle, the 
> number of pointwise channels (linear to p) is quickly dwarfed by all-to-all 
> connections (quadratic to p). I'd add some alternative ideas to the 
> discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to