Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4722#discussion_r141390556 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -190,6 +193,39 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception { } @Test + public void testGetKeys() throws Exception { + final int elementsToTest = 1000; + String fieldName = "get-keys-while-modifying-test"; + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + try { + ValueState<Integer> keyedState = backend.getOrCreateKeyedState( + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE)); + ((InternalValueState<VoidNamespace, Integer>) keyedState).setCurrentNamespace(VoidNamespace.INSTANCE); + + for (int key = 0; key < elementsToTest; key++) { + backend.setCurrentKey(key); + keyedState.update(key * 2); + } + + try (Stream<Integer> keysStream = backend.getKeys(fieldName, VoidNamespace.INSTANCE).sorted()) { + PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator(); + + for (int expectedKey = 0; expectedKey < elementsToTest; expectedKey++) { + assertTrue(actualIterator.hasNext()); + assertEquals(expectedKey, actualIterator.nextInt()); + } + + assertFalse(actualIterator.hasNext()); + } + } + finally { + org.apache.commons.io.IOUtils.closeQuietly(backend); --- End diff -- nit: why do we need the fully qualified name her?
---