[FLINK-6471] [checkpoint] Fix RocksDBStateBackendTest#testCancelRunningSnapshot 
failing sporadically


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8ffacb1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8ffacb1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8ffacb1

Branch: refs/heads/master
Commit: b8ffacb1b88690090120cdb2341c68b53dc167ba
Parents: 63c04a5
Author: Stefan Richter <[email protected]>
Authored: Sun May 7 15:09:05 2017 +0200
Committer: Stefan Richter <[email protected]>
Committed: Sun May 7 22:00:43 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |   6 +-
 .../state/RocksDBStateBackendTest.java          |   4 +-
 .../memory/MemCheckpointStreamFactory.java      |  22 ++--
 .../BlockerCheckpointStreamFactory.java         | 112 -------------------
 .../runtime/state/OperatorStateBackendTest.java |  12 +-
 .../util/BlockerCheckpointStreamFactory.java    |  48 +++++---
 6 files changed, 60 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 079ea13..3cb21ac 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -799,7 +799,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        try {
                                outputStream = checkpointStreamFactory
                                        
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
-                               
stateBackend.cancelStreamRegistry.registerClosable(outputStream);
+                               
closeableRegistry.registerClosable(outputStream);
 
                                KeyedBackendSerializationProxy 
serializationProxy =
                                        new 
KeyedBackendSerializationProxy(stateBackend.keySerializer, 
stateMetaInfoSnapshots);
@@ -807,14 +807,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                serializationProxy.write(out);
 
-                               
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+                               
closeableRegistry.unregisterClosable(outputStream);
                                StreamStateHandle result = 
outputStream.closeAndGetHandle();
                                outputStream = null;
 
                                return result;
                        } finally {
                                if (outputStream != null) {
-                                       
stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
+                                       
closeableRegistry.unregisterClosable(outputStream);
                                        outputStream.close();
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 99b71c5..9340455 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -308,8 +308,8 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                waiter.await(); // wait for snapshot to run
                waiter.reset();
                runStateUpdates();
-               blocker.trigger(); // allow checkpointing to start writing
                snapshot.cancel(true);
+               blocker.trigger(); // allow checkpointing to start writing
                assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
                waiter.await(); // wait for snapshot stream writing to run
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 9b2b46f..3920ce8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * {@link CheckpointStreamFactory} that produces streams that write to 
in-memory byte arrays.
@@ -78,12 +79,13 @@ public class MemCheckpointStreamFactory implements 
CheckpointStreamFactory {
 
                private final int maxSize;
 
-               private boolean closed;
+               private AtomicBoolean closed;
 
                boolean isEmpty = true;
 
                public MemoryCheckpointOutputStream(int maxSize) {
                        this.maxSize = maxSize;
+                       this.closed = new AtomicBoolean(false);
                }
 
                @Override
@@ -110,8 +112,9 @@ public class MemCheckpointStreamFactory implements 
CheckpointStreamFactory {
 
                @Override
                public void close() {
-                       closed = true;
-                       os.reset();
+                       if (closed.compareAndSet(false, true)) {
+                               closeInternal();
+                       }
                }
 
                @Override
@@ -128,7 +131,7 @@ public class MemCheckpointStreamFactory implements 
CheckpointStreamFactory {
                }
 
                public boolean isClosed() {
-                       return closed;
+                       return closed.get();
                }
 
                /**
@@ -137,15 +140,18 @@ public class MemCheckpointStreamFactory implements 
CheckpointStreamFactory {
                 * @throws IOException Thrown if the size of the data exceeds 
the maximal
                 */
                public byte[] closeAndGetBytes() throws IOException {
-                       if (!closed) {
+                       if (closed.compareAndSet(false, true)) {
                                checkSize(os.size(), maxSize);
                                byte[] bytes = os.toByteArray();
-                               close();
+                               closeInternal();
                                return bytes;
-                       }
-                       else {
+                       } else {
                                throw new IOException("stream has already been 
closed");
                        }
                }
+
+               private void closeInternal() {
+                       os.reset();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
deleted file mode 100644
index 6f892e2..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/BlockerCheckpointStreamFactory.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
-
-import java.io.IOException;
-
-/**
- * A {@link CheckpointStreamFactory} for tests that creates streams that block 
on a latch to test concurrency in
- * checkpointing.
- */
-public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory 
{
-
-       private final int maxSize;
-       private int afterNumberInvocations;
-       private OneShotLatch blocker;
-       private OneShotLatch waiter;
-
-       MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
lastCreatedStream;
-
-       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
getLastCreatedStream() {
-               return lastCreatedStream;
-       }
-
-       public BlockerCheckpointStreamFactory(int maxSize) {
-               this.maxSize = maxSize;
-       }
-
-       public void setAfterNumberInvocations(int afterNumberInvocations) {
-               this.afterNumberInvocations = afterNumberInvocations;
-       }
-
-       public void setBlockerLatch(OneShotLatch latch) {
-               this.blocker = latch;
-       }
-
-       public void setWaiterLatch(OneShotLatch latch) {
-               this.waiter = latch;
-       }
-
-       @Override
-       public MemCheckpointStreamFactory.MemoryCheckpointOutputStream 
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws 
Exception {
-               waiter.trigger();
-               this.lastCreatedStream = new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
-
-                       private int afterNInvocations = afterNumberInvocations;
-                       private final OneShotLatch streamBlocker = blocker;
-                       private final OneShotLatch streamWaiter = waiter;
-
-                       @Override
-                       public void write(int b) throws IOException {
-
-                               if (afterNInvocations > 0) {
-                                       --afterNInvocations;
-                               }
-
-                               if (0 == afterNInvocations && null != 
streamBlocker) {
-                                       try {
-                                               streamBlocker.await();
-                                       } catch (InterruptedException ignored) {
-                                       }
-                               }
-                               try {
-                                       super.write(b);
-                               } catch (IOException ex) {
-                                       if (null != streamWaiter) {
-                                               streamWaiter.trigger();
-                                       }
-                                       throw ex;
-                               }
-
-                               if (0 == afterNInvocations && null != 
streamWaiter) {
-                                       streamWaiter.trigger();
-                               }
-                       }
-
-                       @Override
-                       public void close() {
-                               super.close();
-                               if (null != streamWaiter) {
-                                       streamWaiter.trigger();
-                               }
-                       }
-               };
-
-               return lastCreatedStream;
-       }
-
-       @Override
-       public void close() throws Exception {
-
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 50ca159..85b9eaf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -21,21 +21,21 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.checkpoint.BlockerCheckpointStreamFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.FutureUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.File;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.concurrent.CancellationException;
@@ -477,18 +477,20 @@ public class OperatorStateBackendTest {
 
                executorService.submit(runnableFuture);
 
-               // wait until the async checkpoint is in the write code, then 
continue
+               // wait until the async checkpoint is in the stream's write 
code, then continue
                waiterLatch.await();
 
+               // cancel the future, which should close the underlying stream
                runnableFuture.cancel(true);
+               
Assert.assertTrue(streamFactory.getLastCreatedStream().isClosed());
 
+               // we allow the stream under test to proceed
                blockerLatch.trigger();
 
                try {
                        runnableFuture.get(60, TimeUnit.SECONDS);
                        Assert.fail();
                } catch (CancellationException ignore) {
-
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b8ffacb1/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
index 1e31490..98e654f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java
@@ -71,31 +71,28 @@ public class BlockerCheckpointStreamFactory implements 
CheckpointStreamFactory {
                        @Override
                        public void write(int b) throws IOException {
 
-                               if (null != waiter) {
-                                       waiter.trigger();
-                               }
+                               unblockWaiter();
 
                                if (afterNInvocations > 0) {
                                        --afterNInvocations;
+                               } else {
+                                       awaitBlocker();
                                }
 
-                               if (0 == afterNInvocations && null != 
streamBlocker) {
-                                       try {
-                                               streamBlocker.await();
-                                       } catch (InterruptedException ignored) {
-                                       }
-                               }
                                try {
                                        super.write(b);
                                } catch (IOException ex) {
-                                       if (null != streamWaiter) {
-                                               streamWaiter.trigger();
-                                       }
+                                       unblockWaiter();
                                        throw ex;
                                }
 
-                               if (0 == afterNInvocations && null != 
streamWaiter) {
-                                       streamWaiter.trigger();
+                               if (0 == afterNInvocations) {
+                                       unblockWaiter();
+                               }
+
+                               // We also check for close here, in case the 
underlying stream does not do this
+                               if (isClosed()) {
+                                       throw new IOException("Stream closed.");
                                }
                        }
 
@@ -110,10 +107,33 @@ public class BlockerCheckpointStreamFactory implements 
CheckpointStreamFactory {
                        @Override
                        public void close() {
                                super.close();
+                               // trigger all the latches, essentially all 
blocking ops on the stream should resume after close.
+                               unblockAll();
+                       }
+
+                       private void unblockWaiter() {
                                if (null != streamWaiter) {
                                        streamWaiter.trigger();
                                }
                        }
+
+                       private void awaitBlocker() {
+                               if (null != streamBlocker) {
+                                       try {
+                                               streamBlocker.await();
+                                       } catch (InterruptedException ignored) {
+                                       }
+                               }
+                       }
+
+                       private void unblockAll() {
+                               if (null != streamWaiter) {
+                                       streamWaiter.trigger();
+                               }
+                               if (null != streamBlocker) {
+                                       streamBlocker.trigger();
+                               }
+                       }
                };
 
                return lastCreatedStream;

Reply via email to