[
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936305#comment-15936305
]
ASF GitHub Bot commented on FLINK-6034:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3531#discussion_r107410978
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
---
@@ -306,6 +307,29 @@ private static void
assignTaskStatesToOperatorInstances(
}
/**
+ * Determine the subset of {@link KeyGroupsStateHandle
KeyGroupsStateHandles} with correct
+ * key group index for the given subtask {@link KeyGroupRange}.
+ * <p>
+ * <p>This is publicly visible to be used in tests.
+ */
+ public static List<KeyedStateHandle> getKeyedStateHandles(
+ Collection<? extends KeyedStateHandle>
keyedStateHandles,
+ KeyGroupRange subtaskKeyGroupRange) {
+
+ List<KeyedStateHandle> subtaskKeyedStateHandles = new
ArrayList<>();
+
+ for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
+ KeyGroupRange intersection =
keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange);
--- End diff --
I wonder if we could somehow introduce a
`KeyedStateHandle::intersect(KeyGroupRange)` that again returns a
`KeyedStateHandle` with a `KeyGroupRage` that is the intersection of the
original range and the argument. Basically a higher level version of what the
KeyGroupsStateHandle can do, and the concrete implementations (like
`KeyGroupsStateHandle`) know how the virtually split themselves up into a
sub-range. This also would transfer less data in the RPC (less offsets) and
saves the post-filtering in the backend.
Otherwise, we could have a boolean method for just checking intersection,
because there is no need to create `KeyGroupRange` objects anymore, because we
do not actually use them.
> Add KeyedStateHandle for the snapshots in keyed streams
> -------------------------------------------------------
>
> Key: FLINK-6034
> URL: https://issues.apache.org/jira/browse/FLINK-6034
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Xiaogang Shi
> Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is
> {{KeyGroupsStateHandle}} which is full and store the states one group after
> another. With the introduction of incremental checkpointing, we need a higher
> level abstraction of keyed snapshots to allow flexible snapshot formats.
> The implementation of {{KeyedStateHandle}} s may vary a lot in different
> backends. The only information needed in {{KeyedStateHandle}} s is their key
> group range. When recovering the job with a different degree of parallelism,
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group
> ranges overlap with their ranges.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)