[
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210596#comment-16210596
]
Stefan Richter edited comment on FLINK-5372 at 10/19/17 5:23 AM:
-----------------------------------------------------------------
Those are two separate problems and I don't think the problem from the comment
is related to FLINK-7757.
First problem is, that the test uses a checkpoint stream factory that produces
streams that block on latches, then wants to call {{close()}} while the stream
is supposed to be blocked in an async thread executing and writing the rocksdb
snapshot. While the test fails is, that the first stream is actually used to
take s snapshot of the timer service into raw keyed state. The timer service is
not async and therefore the test blocks and cannot reach {{close()}}. I will
fix the test through a different factory, that gives a blocking stream on the
second call, i.e. for the request that is actually from the keyed backend.
For the second problem, this was a bit nasty to reproduce but simple in cause
and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an
instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}}
method to release native resources. Since the latest RocksDB update, their code
has assertions in place that are executed in {{finalize()}} and will detect
leaking native resources. This happens only when the resource is GC'ed, and
this explains why you could only observe the test crash when it was executed
with more tests, eventually reaching a GC.
was (Author: srichter):
Those are two separate problems and I don't the problem from the comment is
related to FLINK-7757.
First problem is, that the test uses a checkpoint stream factory that produces
streams that block on latches, then wants to call {{close()}} while the stream
is supposed to be blocked in an async thread executing and writing the rocksdb
snapshot. While the test fails is, that the first stream is actually used to
take s snapshot of the timer service into raw keyed state. The timer service is
not async and therefore the test blocks and cannot reach {{close()}}. I will
fix the test through a different factory, that gives a blocking stream on the
second call, i.e. for the request that is actually from the keyed backend.
For the second problem, this was a bit nasty to reproduce but simple in cause
and solution. The test {{testCleanupOfSnapshotsInFailureCase}} created an
instance of {{RocksDBKeyedStateBackend}}, but never called the {{dispose}}
method to release native resources. Since the latest RocksDB update, their code
has assertions in place that are executed in {{finalize()}} and will detect
leaking native resources. This happens only when the resource is GC'ed, and
this explains why you could only observe the test crash when it was executed
with more tests, eventually reaching a GC.
> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --------------------------------------------------------------
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Reporter: Aljoscha Krettek
> Assignee: Stefan Richter
> Priority: Blocker
> Labels: test-stability
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change
> {{AsyncCheckpointOperator}} to make sure that we can run fully
> asynchronously. Then, the test will still fail because the canceling
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator<String>
> implements OneInputStreamOperator<String, String> {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was
> created before
> // we trigger the test checkpoint
> ValueState<String> state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord<String> element) throws Exception
> {
> // we also don't care
> ValueState<String> state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)