[ 
https://issues.apache.org/jira/browse/FLINK-7683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16182784#comment-16182784
 ] 

ASF GitHub Bot commented on FLINK-7683:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4722#discussion_r141390556
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
    @@ -190,6 +193,39 @@ protected CheckpointStreamFactory 
createStreamFactory() throws Exception {
        }
     
        @Test
    +   public void testGetKeys() throws Exception {
    +           final int elementsToTest = 1000;
    +           String fieldName = "get-keys-while-modifying-test";
    +           AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
    +           try {
    +                   ValueState<Integer> keyedState = 
backend.getOrCreateKeyedState(
    +                           VoidNamespaceSerializer.INSTANCE,
    +                           new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE));
    +                   ((InternalValueState<VoidNamespace, Integer>) 
keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
    +
    +                   for (int key = 0; key < elementsToTest; key++) {
    +                           backend.setCurrentKey(key);
    +                           keyedState.update(key * 2);
    +                   }
    +
    +                   try (Stream<Integer> keysStream = 
backend.getKeys(fieldName, VoidNamespace.INSTANCE).sorted()) {
    +                           PrimitiveIterator.OfInt actualIterator = 
keysStream.mapToInt(value -> value.intValue()).iterator();
    +
    +                           for (int expectedKey = 0; expectedKey < 
elementsToTest; expectedKey++) {
    +                                   assertTrue(actualIterator.hasNext());
    +                                   assertEquals(expectedKey, 
actualIterator.nextInt());
    +                           }
    +
    +                           assertFalse(actualIterator.hasNext());
    +                   }
    +           }
    +           finally {
    +                   org.apache.commons.io.IOUtils.closeQuietly(backend);
    --- End diff --
    
    nit: why do we need the fully qualified name her?


> Add method to iterate over all of the existing keys in a statebackend
> ---------------------------------------------------------------------
>
>                 Key: FLINK-7683
>                 URL: https://issues.apache.org/jira/browse/FLINK-7683
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>
> This is required to make possible preserving backward compatibility while 
> changing state definition of a keyed state operator (to do so operator must 
> iterate over all of the existing keys and rewrites them into a new state 
> variable).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to