[ 
https://issues.apache.org/jira/browse/FLINK-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16540080#comment-16540080
 ] 

ASF GitHub Bot commented on FLINK-9804:
---------------------------------------

GitHub user sihuazhou opened a pull request:

    https://github.com/apache/flink/pull/6306

    [FLINK-9804][state] KeyedStateBackend.getKeys() does not work on RocksDB 
MapState

    ## What is the purpose of the change
    
    *This PR fixes the bug that the KeyedStateBackend.getKeys() does not work 
on RocksDB MapState.*
    
    
    ## Brief change log
    
      - *Change `RocksDBKeyedStateBackend#RocksIteratorForKeysWrapper()` to let 
it support get keys for RocksDB MapState.*
    
    
    ## Verifying this change
    
      - *Added `StateBackendTestBase#testMapStateGetKeys()` to guard the 
changes*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
    - No

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sihuazhou/flink FLINK-9804

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6306.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6306
    
----
commit efc4096a4a6f38b3acb0b5189804f7b452218f23
Author: sihuazhou <summerleafs@...>
Date:   2018-07-11T12:35:22Z

    fix KeyedStateBackend.getKeys() for RocksDBMapState.

----


> KeyedStateBackend.getKeys() does not work on RocksDB MapState
> -------------------------------------------------------------
>
>                 Key: FLINK-9804
>                 URL: https://issues.apache.org/jira/browse/FLINK-9804
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.5.1
>            Reporter: Aljoscha Krettek
>            Assignee: Sihua Zhou
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.5.2, 1.6.0
>
>
> This can be reproduced by adding this test to {{StateBackendTestBase}}:
> {code}
> @Test
> public void testMapStateGetKeys() throws Exception {
>       final int namespace1ElementsNum = 1000;
>       final int namespace2ElementsNum = 1000;
>       String fieldName = "get-keys-test";
>       AbstractKeyedStateBackend<Integer> backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>       try {
>               final String ns1 = "ns1";
>               MapState<String, Integer> keyedState1 = 
> backend.getPartitionedState(
>                       ns1,
>                       StringSerializer.INSTANCE,
>                       new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>               );
>               for (int key = 0; key < namespace1ElementsNum; key++) {
>                       backend.setCurrentKey(key);
>                       keyedState1.put("he", key * 2);
>                       keyedState1.put("ho", key * 2);
>               }
>               final String ns2 = "ns2";
>               MapState<String, Integer> keyedState2 = 
> backend.getPartitionedState(
>                       ns2,
>                       StringSerializer.INSTANCE,
>                       new MapStateDescriptor<>(fieldName, 
> StringSerializer.INSTANCE, IntSerializer.INSTANCE)
>               );
>               for (int key = namespace1ElementsNum; key < 
> namespace1ElementsNum + namespace2ElementsNum; key++) {
>                       backend.setCurrentKey(key);
>                       keyedState2.put("he", key * 2);
>                       keyedState2.put("ho", key * 2);
>               }
>               // valid for namespace1
>               try (Stream<Integer> keysStream = backend.getKeys(fieldName, 
> ns1).sorted()) {
>                       PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>                       for (int expectedKey = 0; expectedKey < 
> namespace1ElementsNum; expectedKey++) {
>                               assertTrue(actualIterator.hasNext());
>                               assertEquals(expectedKey, 
> actualIterator.nextInt());
>                       }
>                       assertFalse(actualIterator.hasNext());
>               }
>               // valid for namespace2
>               try (Stream<Integer> keysStream = backend.getKeys(fieldName, 
> ns2).sorted()) {
>                       PrimitiveIterator.OfInt actualIterator = 
> keysStream.mapToInt(value -> value.intValue()).iterator();
>                       for (int expectedKey = namespace1ElementsNum; 
> expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
>                               assertTrue(actualIterator.hasNext());
>                               assertEquals(expectedKey, 
> actualIterator.nextInt());
>                       }
>                       assertFalse(actualIterator.hasNext());
>               }
>       }
>       finally {
>               IOUtils.closeQuietly(backend);
>               backend.dispose();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to