This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 045e68d [FLINK-22966][runtime] StateAssignmentOperation returns only
not null state handles
045e68d is described below
commit 045e68dd412b05b123e7d2b278b4eb9f9afe07ec
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Wed Jun 23 17:12:50 2021 +0300
[FLINK-22966][runtime] StateAssignmentOperation returns only not null state
handles
---
.../runtime/checkpoint/StateAssignmentOperation.java | 4 ++--
.../checkpoint/StateAssignmentOperationTest.java | 19 ++++++++++++++++++-
2 files changed, 20 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index a9e667a..e4e9f27d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -539,7 +539,7 @@ public class StateAssignmentOperation {
}
}
- return subtaskKeyedStateHandles;
+ return subtaskKeyedStateHandles != null ? subtaskKeyedStateHandles :
emptyList();
}
/**
@@ -573,7 +573,7 @@ public class StateAssignmentOperation {
}
}
- return extractedKeyedStateHandles;
+ return extractedKeyedStateHandles != null ? extractedKeyedStateHandles
: emptyList();
}
/**
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
index 642e18b..04af603 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -76,9 +77,11 @@ import static
org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNew
import static
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.ARBITRARY;
import static
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.RANGE;
import static
org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper.ROUND_ROBIN;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
/** Tests to verify state assignment operation. */
public class StateAssignmentOperationTest extends TestLogger {
@@ -725,6 +728,20 @@ public class StateAssignmentOperationTest extends
TestLogger {
getAssignedState(executionJobVertex, operatorId, 0));
}
+ @Test
+ public void assigningStateHandlesCanNotBeNull() {
+ OperatorState state = new OperatorState(new OperatorID(), 1, MAX_P);
+
+ List<KeyedStateHandle> managedKeyedStateHandles =
+ StateAssignmentOperation.getManagedKeyedStateHandles(state,
KeyGroupRange.of(0, 1));
+
+ List<KeyedStateHandle> rawKeyedStateHandles =
+ StateAssignmentOperation.getRawKeyedStateHandles(state,
KeyGroupRange.of(0, 1));
+
+ assertThat(managedKeyedStateHandles, is(empty()));
+ assertThat(rawKeyedStateHandles, is(empty()));
+ }
+
private List<OperatorID> buildOperatorIds(int numOperators) {
return IntStream.range(0, numOperators)
.mapToObj(j -> new OperatorID())