[
https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411172#comment-16411172
]
Kostas Kloudas commented on FLINK-9060:
---------------------------------------
I would say that this code reveals a real bug but in the wrong place.
The {{getKeys()}} method, mentioned in the javadocs ("Modifications to the
state during iterating over it keys are not supported."), is not supposed to be
used for modifying the state. And I would recommend to not change this because
the performance benefits of lazy retrieval of the elements without worrying
about memory quotas may outweigh the shortcomings in other places. I assume
that this is the reason why [~pnowojski] introduced it.
That said, this issue reveals that the {{applyToAllKeys()}} implementation is
problematic and this is where the fix should be applied. Not in the
{{getKeys()}} implementation. For {{RocksDB}}, this may be enough to stay as it
is, but the implementation for the {{HeapStateBackend}} must change.
For the specifics of the implementation, creating a full list of all the keys
has the danger that the whole keyset may not fit in memory but we may be able
to make this assumption. If we go with this, then a plain-old for-loop would be
more efficient than streams. Again, this modification should only be for the
{{HeapStateBackend}}.
Any comments on the above [~aljoscha], [~sihuazhou] and [~pnowojski]?
> Deleting state using KeyedStateBackend.getKeys() throws Exception
> -----------------------------------------------------------------
>
> Key: FLINK-9060
> URL: https://issues.apache.org/jira/browse/FLINK-9060
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Reporter: Aljoscha Krettek
> Assignee: Sihua Zhou
> Priority: Blocker
> Fix For: 1.5.0
>
>
> Adding this test to {{StateBackendTestBase}} showcases the problem:
> {code}
> @Test
> public void testConcurrentModificationWithGetKeys() throws Exception {
> AbstractKeyedStateBackend<Integer> backend =
> createKeyedBackend(IntSerializer.INSTANCE);
> try {
> ListStateDescriptor<String> listStateDescriptor =
> new ListStateDescriptor<>("foo",
> StringSerializer.INSTANCE);
> backend.setCurrentKey(1);
> backend
> .getPartitionedState(VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
> .add("Hello");
> backend.setCurrentKey(2);
> backend
> .getPartitionedState(VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
> .add("Ciao");
> Stream<Integer> keys = backend
> .getKeys(listStateDescriptor.getName(),
> VoidNamespace.INSTANCE);
> keys.forEach((key) -> {
> backend.setCurrentKey(key);
> try {
> backend
> .getPartitionedState(
> VoidNamespace.INSTANCE,
>
> VoidNamespaceSerializer.INSTANCE,
> listStateDescriptor)
> .clear();
> } catch (Exception e) {
> e.printStackTrace();
> }
> });
> }
> finally {
> IOUtils.closeQuietly(backend);
> backend.dispose();
> }
> }
> {code}
> This should work because one of the use cases of {{getKeys()}} and
> {{applyToAllKeys()}} is to do stuff for every key, which includes deleting
> them.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)