This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8ba5ca3 [FLINK-21650][state/heap] Skip KGs not belonging to this
backend on restore (instead of failing)
8ba5ca3 is described below
commit 8ba5ca3d7d797da569839879cab164aa7dc5d947
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Sep 3 15:48:32 2020 +0200
[FLINK-21650][state/heap] Skip KGs not belonging to this backend on restore
(instead of failing)
Motivation:
In the new incremental mode, heap backend wraps KeyGroupsStateHandle
into IncrementalRemoteKeyedStateHandle. The latter don't compute
intersection because it is not directly aware of the offsets. So it just
returns a full keyrange if there is SOME intersection.
On recovery, unused keyGroups are filtered out by RocksDB state backend.
With this change, Heap state backend does the same.
---
.../flink/runtime/state/heap/HeapRestoreOperation.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index e26d569..89fe3e8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -200,12 +200,13 @@ public class HeapRestoreOperation<K> implements
RestoreOperation<Void> {
int keyGroupIndex = groupOffset.f0;
long offset = groupOffset.f1;
- // Check that restored key groups all belong to the backend.
- Preconditions.checkState(
- keyGroupRange.contains(keyGroupIndex),
- "Key group %s doesn't belong to this backend with key
group range: %s",
- keyGroupIndex,
- keyGroupRange);
+ if (!keyGroupRange.contains(keyGroupIndex)) {
+ LOG.debug(
+ "Key group {} doesn't belong to this backend with key
group range: {}",
+ keyGroupIndex,
+ keyGroupRange);
+ continue;
+ }
fsDataInputStream.seek(offset);