Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5518#discussion_r169065861
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
---
@@ -211,24 +211,55 @@ protected CheckpointStreamFactory
createStreamFactory() throws Exception {
@Test
public void testGetKeys() throws Exception {
- final int elementsToTest = 1000;
+ final int namespace1ElementsNum = 1000;
+ final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test";
AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
try {
- ValueState<Integer> keyedState =
backend.getOrCreateKeyedState(
- VoidNamespaceSerializer.INSTANCE,
- new ValueStateDescriptor<>(fieldName,
IntSerializer.INSTANCE));
- ((InternalValueState<VoidNamespace, Integer>)
keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
+ final String ns1 = "ns1";
+ ValueState<Integer> keyedState1 =
backend.getPartitionedState(
+ ns1,
+ StringSerializer.INSTANCE,
+ new ValueStateDescriptor<>(fieldName,
IntSerializer.INSTANCE)
+ );
+
+ ((InternalValueState<String, Integer>)
keyedState1).setCurrentNamespace(ns1);
+
+ for (int key = 0; key < namespace1ElementsNum; key++) {
+ backend.setCurrentKey(key);
+ keyedState1.update(key * 2);
+ }
+
+ ValueState<Integer> keyedState2 =
backend.getPartitionedState(
+ ns1,
--- End diff --
If you give `ns2` here you don't have to call `setCurrentNamespace()` later.
---