This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 045e68d  [FLINK-22966][runtime] StateAssignmentOperation returns only 
not null state handles
045e68d is described below

commit 045e68dd412b05b123e7d2b278b4eb9f9afe07ec
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jun 23 17:12:50 2021 +0300

    [FLINK-22966][runtime] StateAssignmentOperation returns only not null state 
handles
---
 .../runtime/checkpoint/StateAssignmentOperation.java  |  4 ++--
 .../checkpoint/StateAssignmentOperationTest.java      | 19 ++++++++++++++++++-
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index a9e667a..e4e9f27d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -539,7 +539,7 @@ public class StateAssignmentOperation {
             }
         }
 
-        return subtaskKeyedStateHandles;
+        return subtaskKeyedStateHandles != null ? subtaskKeyedStateHandles : 
emptyList();
     }
 
     /**
@@ -573,7 +573,7 @@ public class StateAssignmentOperation {
             }
         }
 
-        return extractedKeyedStateHandles;
+        return extractedKeyedStateHandles != null ? extractedKeyedStateHandles 
: emptyList();
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index 642e18b..04af603 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -76,9 +77,11 @@ import static 
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNew
 import static 
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.ARBITRARY;
 import static 
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.RANGE;
 import static 
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.ROUND_ROBIN;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 /** Tests to verify state assignment operation. */
 public class StateAssignmentOperationTest extends TestLogger {
@@ -725,6 +728,20 @@ public class StateAssignmentOperationTest extends 
TestLogger {
                 getAssignedState(executionJobVertex, operatorId, 0));
     }
 
+    @Test
+    public void assigningStateHandlesCanNotBeNull() {
+        OperatorState state = new OperatorState(new OperatorID(), 1, MAX_P);
+
+        List<KeyedStateHandle> managedKeyedStateHandles =
+                StateAssignmentOperation.getManagedKeyedStateHandles(state, 
KeyGroupRange.of(0, 1));
+
+        List<KeyedStateHandle> rawKeyedStateHandles =
+                StateAssignmentOperation.getRawKeyedStateHandles(state, 
KeyGroupRange.of(0, 1));
+
+        assertThat(managedKeyedStateHandles, is(empty()));
+        assertThat(rawKeyedStateHandles, is(empty()));
+    }
+
     private List<OperatorID> buildOperatorIds(int numOperators) {
         return IntStream.range(0, numOperators)
                 .mapToObj(j -> new OperatorID())

Reply via email to