[
https://issues.apache.org/jira/browse/FLINK-21936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307637#comment-17307637
]
Arvid Heise commented on FLINK-21936:
-------------------------------------
Note that the initial example was deliberately kept simple. {{source -> keyby
-> task1 -> forward -> task2 -> forward -> task3}} should also retain the
properties; any key group context needs to be preserved until the next
all-to-all connection.
We also need to explore how much we expect from rescale: {{source -> keyby ->
task1 -> forward -> task2 -> rescale (2x) -> task3}} would potentially split
keys into two, but retains the ordering within the groups.
{{source -> keyby -> task1 -> forward -> task2 -> rescale (.5x) -> task3}}
merges key group ranges but each key is still processed by the same downstream
subtask of task3 and order is retained.
> Disable checkpointing of inflight data in pointwise connections for unaligned
> checkpoints
> -----------------------------------------------------------------------------------------
>
> Key: FLINK-21936
> URL: https://issues.apache.org/jira/browse/FLINK-21936
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.13.0
> Reporter: Arvid Heise
> Assignee: Arvid Heise
> Priority: Major
>
> We currently do not have any hard guarantees on pointwise connection
> regarding data consistency. However, since data was structured implicitly in
> the same way as any preceding source or keyby, some users relied on this
> behavior to divide compute-intensive tasks into smaller chunks while relying
> on ordering guarantees.
> As long as the parallelism does not change, unaligned checkpoints (UC)
> retains these properties. With the implementation of rescaling of UC
> (FLINK-19801), that has changed. For most exchanges, there is a meaningful
> way to reassign state from one channel to another (even in random order). For
> some exchanges, the mapping is ambiguous and requires post-filtering.
> However, for point-wise connections, it's impossible while retaining these
> properties.
> Consider, {{source -> keyby -> task1 -> forward -> task2}}. Now if we want to
> rescale from parallelism p = 1 to p = 2, suddenly the records inside the
> keyby channels need to be divided into two channels according to the
> keygroups. That is easily possible by using the keygroup ranges of the
> operators and a way to determine the key(group) of the record (independent of
> the actual approach). For the forward channel, we completely lack the key
> context. No record in the forward channel has any keygroup assigned; it's
> also not possible to calculate it as there is no guarantee that the key is
> still present.
> The root cause for this limitation is the conceptual mismatch between what we
> provide and what some users assume we provide (or we assume that the users
> assume). For example, it's impossible to use (keyed) state in task2 right
> now, because there is no key context, but we still want to guarantee
> orderness in respect to that key context.
> For 1.13, the easiest solution is to disable channel state in pointwise
> connections. For any non-trivial application with at least one shuffle, the
> number of pointwise channels (linear to p) is quickly dwarfed by all-to-all
> connections (quadratic to p). I'd add some alternative ideas to the
> discussion.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)