[
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182784#comment-16182784
]
ASF GitHub Bot commented on FLINK-7683:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/4722#discussion_r141390556
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
---
@@ -190,6 +193,39 @@ protected CheckpointStreamFactory
createStreamFactory() throws Exception {
}
@Test
+ public void testGetKeys() throws Exception {
+ final int elementsToTest = 1000;
+ String fieldName = "get-keys-while-modifying-test";
+ AbstractKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);
+ try {
+ ValueState<Integer> keyedState =
backend.getOrCreateKeyedState(
+ VoidNamespaceSerializer.INSTANCE,
+ new ValueStateDescriptor<>(fieldName,
IntSerializer.INSTANCE));
+ ((InternalValueState<VoidNamespace, Integer>)
keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
+
+ for (int key = 0; key < elementsToTest; key++) {
+ backend.setCurrentKey(key);
+ keyedState.update(key * 2);
+ }
+
+ try (Stream<Integer> keysStream =
backend.getKeys(fieldName, VoidNamespace.INSTANCE).sorted()) {
+ PrimitiveIterator.OfInt actualIterator =
keysStream.mapToInt(value -> value.intValue()).iterator();
+
+ for (int expectedKey = 0; expectedKey <
elementsToTest; expectedKey++) {
+ assertTrue(actualIterator.hasNext());
+ assertEquals(expectedKey,
actualIterator.nextInt());
+ }
+
+ assertFalse(actualIterator.hasNext());
+ }
+ }
+ finally {
+ org.apache.commons.io.IOUtils.closeQuietly(backend);
--- End diff --
nit: why do we need the fully qualified name her?
> Add method to iterate over all of the existing keys in a statebackend
> ---------------------------------------------------------------------
>
> Key: FLINK-7683
> URL: https://issues.apache.org/jira/browse/FLINK-7683
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Piotr Nowojski
> Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while
> changing state definition of a keyed state operator (to do so operator must
> iterate over all of the existing keys and rewrites them into a new state
> variable).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)