[FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest#testCancelRunningSnapshot failing sporadically
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8ffacb1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8ffacb1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8ffacb1 Branch: refs/heads/master Commit: b8ffacb1b88690090120cdb2341c68b53dc167ba Parents: 63c04a5 Author: Stefan Richter <[email protected]> Authored: Sun May 7 15:09:05 2017 +0200 Committer: Stefan Richter <[email protected]> Committed: Sun May 7 22:00:43 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 6 +- .../state/RocksDBStateBackendTest.java | 4 +- .../memory/MemCheckpointStreamFactory.java | 22 ++-- .../BlockerCheckpointStreamFactory.java | 112 ------------------- .../runtime/state/OperatorStateBackendTest.java | 12 +- .../util/BlockerCheckpointStreamFactory.java | 48 +++++--- 6 files changed, 60 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 079ea13..3cb21ac 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -799,7 +799,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { try { outputStream = checkpointStreamFactory .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); - stateBackend.cancelStreamRegistry.registerClosable(outputStream); + closeableRegistry.registerClosable(outputStream); KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots); @@ -807,14 +807,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { serializationProxy.write(out); - stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + closeableRegistry.unregisterClosable(outputStream); StreamStateHandle result = outputStream.closeAndGetHandle(); outputStream = null; return result; } finally { if (outputStream != null) { - stateBackend.cancelStreamRegistry.unregisterClosable(outputStream); + closeableRegistry.unregisterClosable(outputStream); outputStream.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 99b71c5..9340455 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -26,7 +26,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -308,8 +308,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa waiter.await(); // wait for snapshot to run waiter.reset(); runStateUpdates(); - blocker.trigger(); // allow checkpointing to start writing snapshot.cancel(true); + blocker.trigger(); // allow checkpointing to start writing assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); waiter.await(); // wait for snapshot stream writing to run try { http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java index 9b2b46f..3920ce8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import java.io.IOException; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; /** * {@link CheckpointStreamFactory} that produces streams that write to in-memory byte arrays. @@ -78,12 +79,13 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { private final int maxSize; - private boolean closed; + private AtomicBoolean closed; boolean isEmpty = true; public MemoryCheckpointOutputStream(int maxSize) { this.maxSize = maxSize; + this.closed = new AtomicBoolean(false); } @Override @@ -110,8 +112,9 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { @Override public void close() { - closed = true; - os.reset(); + if (closed.compareAndSet(false, true)) { + closeInternal(); + } } @Override @@ -128,7 +131,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { } public boolean isClosed() { - return closed; + return closed.get(); } /** @@ -137,15 +140,18 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory { * @throws IOException Thrown if the size of the data exceeds the maximal */ public byte[] closeAndGetBytes() throws IOException { - if (!closed) { + if (closed.compareAndSet(false, true)) { checkSize(os.size(), maxSize); byte[] bytes = os.toByteArray(); - close(); + closeInternal(); return bytes; - } - else { + } else { throw new IOException("stream has already been closed"); } } + + private void closeInternal() { + os.reset(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java deleted file mode 100644 index 6f892e2..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint; - -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; - -import java.io.IOException; - -/** - * A {@link CheckpointStreamFactory} for tests that creates streams that block on a latch to test concurrency in - * checkpointing. - */ -public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { - - private final int maxSize; - private int afterNumberInvocations; - private OneShotLatch blocker; - private OneShotLatch waiter; - - MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream; - - public MemCheckpointStreamFactory.MemoryCheckpointOutputStream getLastCreatedStream() { - return lastCreatedStream; - } - - public BlockerCheckpointStreamFactory(int maxSize) { - this.maxSize = maxSize; - } - - public void setAfterNumberInvocations(int afterNumberInvocations) { - this.afterNumberInvocations = afterNumberInvocations; - } - - public void setBlockerLatch(OneShotLatch latch) { - this.blocker = latch; - } - - public void setWaiterLatch(OneShotLatch latch) { - this.waiter = latch; - } - - @Override - public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { - waiter.trigger(); - this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) { - - private int afterNInvocations = afterNumberInvocations; - private final OneShotLatch streamBlocker = blocker; - private final OneShotLatch streamWaiter = waiter; - - @Override - public void write(int b) throws IOException { - - if (afterNInvocations > 0) { - --afterNInvocations; - } - - if (0 == afterNInvocations && null != streamBlocker) { - try { - streamBlocker.await(); - } catch (InterruptedException ignored) { - } - } - try { - super.write(b); - } catch (IOException ex) { - if (null != streamWaiter) { - streamWaiter.trigger(); - } - throw ex; - } - - if (0 == afterNInvocations && null != streamWaiter) { - streamWaiter.trigger(); - } - } - - @Override - public void close() { - super.close(); - if (null != streamWaiter) { - streamWaiter.trigger(); - } - } - }; - - return lastCreatedStream; - } - - @Override - public void close() throws Exception { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 50ca159..85b9eaf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -21,21 +21,21 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.util.FutureUtil; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.io.File; import java.util.Collections; import java.util.Iterator; import java.util.concurrent.CancellationException; @@ -477,18 +477,20 @@ public class OperatorStateBackendTest { executorService.submit(runnableFuture); - // wait until the async checkpoint is in the write code, then continue + // wait until the async checkpoint is in the stream's write code, then continue waiterLatch.await(); + // cancel the future, which should close the underlying stream runnableFuture.cancel(true); + Assert.assertTrue(streamFactory.getLastCreatedStream().isClosed()); + // we allow the stream under test to proceed blockerLatch.trigger(); try { runnableFuture.get(60, TimeUnit.SECONDS); Assert.fail(); } catch (CancellationException ignore) { - } } http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java index 1e31490..98e654f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java @@ -71,31 +71,28 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { @Override public void write(int b) throws IOException { - if (null != waiter) { - waiter.trigger(); - } + unblockWaiter(); if (afterNInvocations > 0) { --afterNInvocations; + } else { + awaitBlocker(); } - if (0 == afterNInvocations && null != streamBlocker) { - try { - streamBlocker.await(); - } catch (InterruptedException ignored) { - } - } try { super.write(b); } catch (IOException ex) { - if (null != streamWaiter) { - streamWaiter.trigger(); - } + unblockWaiter(); throw ex; } - if (0 == afterNInvocations && null != streamWaiter) { - streamWaiter.trigger(); + if (0 == afterNInvocations) { + unblockWaiter(); + } + + // We also check for close here, in case the underlying stream does not do this + if (isClosed()) { + throw new IOException("Stream closed."); } } @@ -110,10 +107,33 @@ public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { @Override public void close() { super.close(); + // trigger all the latches, essentially all blocking ops on the stream should resume after close. + unblockAll(); + } + + private void unblockWaiter() { if (null != streamWaiter) { streamWaiter.trigger(); } } + + private void awaitBlocker() { + if (null != streamBlocker) { + try { + streamBlocker.await(); + } catch (InterruptedException ignored) { + } + } + } + + private void unblockAll() { + if (null != streamWaiter) { + streamWaiter.trigger(); + } + if (null != streamBlocker) { + streamBlocker.trigger(); + } + } }; return lastCreatedStream;
