This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ba5ca3  [FLINK-21650][state/heap] Skip KGs not belonging to this 
backend on restore (instead of failing)
8ba5ca3 is described below

commit 8ba5ca3d7d797da569839879cab164aa7dc5d947
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Sep 3 15:48:32 2020 +0200

    [FLINK-21650][state/heap] Skip KGs not belonging to this backend on restore 
(instead of failing)
    
    Motivation:
    In the new incremental mode, heap backend wraps KeyGroupsStateHandle
    into IncrementalRemoteKeyedStateHandle. The latter don't compute
    intersection because it is not directly aware of the offsets. So it just
    returns a full keyrange if there is SOME intersection.
    On recovery, unused keyGroups are filtered out by RocksDB state backend.
    With this change, Heap state backend does the same.
---
 .../flink/runtime/state/heap/HeapRestoreOperation.java      | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index e26d569..89fe3e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -200,12 +200,13 @@ public class HeapRestoreOperation<K> implements 
RestoreOperation<Void> {
             int keyGroupIndex = groupOffset.f0;
             long offset = groupOffset.f1;
 
-            // Check that restored key groups all belong to the backend.
-            Preconditions.checkState(
-                    keyGroupRange.contains(keyGroupIndex),
-                    "Key group %s doesn't belong to this backend with key 
group range: %s",
-                    keyGroupIndex,
-                    keyGroupRange);
+            if (!keyGroupRange.contains(keyGroupIndex)) {
+                LOG.debug(
+                        "Key group {} doesn't belong to this backend with key 
group range: {}",
+                        keyGroupIndex,
+                        keyGroupRange);
+                continue;
+            }
 
             fsDataInputStream.seek(offset);
 

Reply via email to