[ https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415234#comment-16415234 ]
Chesnay Schepler edited comment on FLINK-5372 at 3/27/18 8:38 AM: ------------------------------------------------------------------ The instability seems to be caused by a plain race condition in the test: {code} // L 327 // at this point the task is stuck waiting the the blocker latch task.cancel(); // let the task continue blockerCheckpointStreamFactory.getBlockerLatch().trigger(); testHarness.endInput(); // we expect the task to be shutdown, but there's no guarantee it managed to do so in time Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed()); {code} was (Author: zentol): The instability seems to be caused by a plain race condition in the test: {code} // L 327 // at this point the task is stuck waiting the the blocker latch task.cancel(); // let the task continue blockerCheckpointStreamFactory.getBlockerLatch().trigger(); testHarness.endInput(); // we expect the task to be shutdown, but there's no guarantee it managed to do so in time Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed()); {code} > Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() > -------------------------------------------------------------- > > Key: FLINK-5372 > URL: https://issues.apache.org/jira/browse/FLINK-5372 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.5.0 > Reporter: Aljoscha Krettek > Assignee: Stefan Richter > Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The test is currently {{@Ignored}}. We have to change > {{AsyncCheckpointOperator}} to make sure that we can run fully > asynchronously. Then, the test will still fail because the canceling > behaviour was changed in the meantime. > {code} > public static class AsyncCheckpointOperator > extends AbstractStreamOperator<String> > implements OneInputStreamOperator<String, String> { > @Override > public void open() throws Exception { > super.open(); > // also get the state in open, this way we are sure that it was > created before > // we trigger the test checkpoint > ValueState<String> state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > } > @Override > public void processElement(StreamRecord<String> element) throws Exception > { > // we also don't care > ValueState<String> state = getPartitionedState( > VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE, > new ValueStateDescriptor<>("count", > StringSerializer.INSTANCE, "hello")); > state.update(element.getValue()); > } > @Override > public void snapshotState(StateSnapshotContext context) throws Exception { > // do nothing so that we don't block > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)