Repository: flink
Updated Branches:
  refs/heads/release-1.3 0963718ac -> f62004079


http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
new file mode 100644
index 0000000..0c215cd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
+ *
+ * <p> Tests include:
+ * <ul>
+ * <li>Expected usage of operations</li>
+ * <li>Correct ordering of ZooKeeper and state handle operations</li>
+ * </ul>
+ */
+public class ZooKeeperStateHandleStoreTest extends TestLogger {
+
+       private static final ZooKeeperTestEnvironment ZOOKEEPER = new 
ZooKeeperTestEnvironment(1);
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (ZOOKEEPER != null) {
+                       ZOOKEEPER.shutdown();
+               }
+       }
+
+       @Before
+       public void cleanUp() throws Exception {
+               ZOOKEEPER.deleteAll();
+       }
+
+       /**
+        * Tests add operation with lock.
+        */
+       @Test
+       public void testAddAndLock() throws Exception {
+               LongStateStorage longStateStorage = new LongStateStorage();
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<Long>(
+                               ZOOKEEPER.getClient(), longStateStorage, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = "/testAdd";
+               final Long state = 1239712317L;
+
+               // Test
+               store.addAndLock(pathInZooKeeper, state);
+
+               // Verify
+               // State handle created
+               assertEquals(1, store.getAllAndLock().size());
+               assertEquals(state, 
store.getAndLock(pathInZooKeeper).retrieveState());
+
+               // Path created and is persistent
+               Stat stat = 
ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper);
+               assertNotNull(stat);
+               assertEquals(0, stat.getEphemeralOwner());
+
+               List<String> children = 
ZOOKEEPER.getClient().getChildren().forPath(pathInZooKeeper);
+
+               // there should be one child which is the lock
+               assertEquals(1, children.size());
+
+               stat = 
ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper + '/' + 
children.get(0));
+               assertNotNull(stat);
+
+               // check that the child is an ephemeral node
+               assertNotEquals(0, stat.getEphemeralOwner());
+
+               // Data is equal
+               @SuppressWarnings("unchecked")
+               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
+                               
ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+                               
ClassLoader.getSystemClassLoader())).retrieveState();
+
+               assertEquals(state, actual);
+       }
+
+       /**
+        * Tests that an existing path throws an Exception.
+        */
+       @Test(expected = Exception.class)
+       public void testAddAlreadyExistingPath() throws Exception {
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               
ZOOKEEPER.getClient().create().forPath("/testAddAlreadyExistingPath");
+
+               store.addAndLock("/testAddAlreadyExistingPath", 1L);
+
+               // writing to the state storage should have succeeded
+               assertEquals(1, stateHandleProvider.getStateHandles());
+
+               // the created state handle should have been cleaned up if the 
add operation failed
+               assertEquals(1, 
stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+       }
+
+       /**
+        * Tests that the created state handle is discarded if ZooKeeper create 
fails.
+        */
+       @Test
+       public void testAddDiscardStateHandleAfterFailure() throws Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               CuratorFramework client = spy(ZOOKEEPER.getClient());
+               when(client.inTransaction().create()).thenThrow(new 
RuntimeException("Expected test Exception."));
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               client, stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = 
"/testAddDiscardStateHandleAfterFailure";
+               final Long state = 81282227L;
+
+               try {
+                       // Test
+                       store.addAndLock(pathInZooKeeper, state);
+                       fail("Did not throw expected exception");
+               }
+               catch (Exception ignored) {
+               }
+
+               // Verify
+               // State handle created and discarded
+               assertEquals(1, stateHandleProvider.getStateHandles().size());
+               assertEquals(state, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
+               assertEquals(1, 
stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
+       }
+
+       /**
+        * Tests that a state handle is replaced.
+        */
+       @Test
+       public void testReplace() throws Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = "/testReplace";
+               final Long initialState = 30968470898L;
+               final Long replaceState = 88383776661L;
+
+               // Test
+               store.addAndLock(pathInZooKeeper, initialState);
+               store.replace(pathInZooKeeper, 0, replaceState);
+
+               // Verify
+               // State handles created
+               assertEquals(2, stateHandleProvider.getStateHandles().size());
+               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
+               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).retrieveState());
+
+               // Path created and is persistent
+               Stat stat = 
ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper);
+               assertNotNull(stat);
+               assertEquals(0, stat.getEphemeralOwner());
+
+               // Data is equal
+               @SuppressWarnings("unchecked")
+               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
+                               
ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+                               
ClassLoader.getSystemClassLoader())).retrieveState();
+
+               assertEquals(replaceState, actual);
+       }
+
+       /**
+        * Tests that a non existing path throws an Exception.
+        */
+       @Test(expected = Exception.class)
+       public void testReplaceNonExistingPath() throws Exception {
+               RetrievableStateStorageHelper<Long> stateStorage = new 
LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateStorage, 
Executors.directExecutor());
+
+               store.replace("/testReplaceNonExistingPath", 0, 1L);
+       }
+
+       /**
+        * Tests that the replace state handle is discarded if ZooKeeper 
setData fails.
+        */
+       @Test
+       public void testReplaceDiscardStateHandleAfterFailure() throws 
Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               CuratorFramework client = spy(ZOOKEEPER.getClient());
+               when(client.setData()).thenThrow(new RuntimeException("Expected 
test Exception."));
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               client, stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = 
"/testReplaceDiscardStateHandleAfterFailure";
+               final Long initialState = 30968470898L;
+               final Long replaceState = 88383776661L;
+
+               // Test
+               store.addAndLock(pathInZooKeeper, initialState);
+
+               try {
+                       store.replace(pathInZooKeeper, 0, replaceState);
+                       fail("Did not throw expected exception");
+               }
+               catch (Exception ignored) {
+               }
+
+               // Verify
+               // State handle created and discarded
+               assertEquals(2, stateHandleProvider.getStateHandles().size());
+               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
+               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).retrieveState());
+               assertEquals(1, 
stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
+
+               // Initial value
+               @SuppressWarnings("unchecked")
+               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
+                               
ZOOKEEPER.getClient().getData().forPath(pathInZooKeeper),
+                               
ClassLoader.getSystemClassLoader())).retrieveState();
+
+               assertEquals(initialState, actual);
+       }
+
+       /**
+        * Tests get operation.
+        */
+       @Test
+       public void testGetAndExists() throws Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = "/testGetAndExists";
+               final Long state = 311222268470898L;
+
+               // Test
+               assertEquals(-1, store.exists(pathInZooKeeper));
+
+               store.addAndLock(pathInZooKeeper, state);
+               RetrievableStateHandle<Long> actual = 
store.getAndLock(pathInZooKeeper);
+
+               // Verify
+               assertEquals(state, actual.retrieveState());
+               assertTrue(store.exists(pathInZooKeeper) >= 0);
+       }
+
+       /**
+        * Tests that a non existing path throws an Exception.
+        */
+       @Test(expected = Exception.class)
+       public void testGetNonExistingPath() throws Exception {
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               store.getAndLock("/testGetNonExistingPath");
+       }
+
+       /**
+        * Tests that all added state is returned.
+        */
+       @Test
+       public void testGetAll() throws Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = "/testGetAll";
+
+               final Set<Long> expected = new HashSet<>();
+               expected.add(311222268470898L);
+               expected.add(132812888L);
+               expected.add(27255442L);
+               expected.add(11122233124L);
+
+               // Test
+               for (long val : expected) {
+                       store.addAndLock(pathInZooKeeper + val, val);
+               }
+
+               for (Tuple2<RetrievableStateHandle<Long>, String> val : 
store.getAllAndLock()) {
+                       assertTrue(expected.remove(val.f0.retrieveState()));
+               }
+               assertEquals(0, expected.size());
+       }
+
+       /**
+        * Tests that the state is returned sorted.
+        */
+       @Test
+       public void testGetAllSortedByName() throws Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String basePath = "/testGetAllSortedByName";
+
+               final Long[] expected = new Long[] {
+                               311222268470898L, 132812888L, 27255442L, 
11122233124L };
+
+               // Test
+               for (long val : expected) {
+                       final String pathInZooKeeper = String.format("%s%016d", 
basePath, val);
+                       store.addAndLock(pathInZooKeeper, val);
+               }
+
+               List<Tuple2<RetrievableStateHandle<Long>, String>> actual = 
store.getAllSortedByNameAndLock();
+               assertEquals(expected.length, actual.size());
+
+               // bring the elements in sort order
+               Arrays.sort(expected);
+
+               for (int i = 0; i < expected.length; i++) {
+                       assertEquals(expected[i], 
actual.get(i).f0.retrieveState());
+               }
+       }
+
+       /**
+        * Tests that state handles are correctly removed.
+        */
+       @Test
+       public void testRemove() throws Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = "/testRemove";
+               final Long state = 27255442L;
+
+               store.addAndLock(pathInZooKeeper, state);
+
+               // Test
+               store.releaseAndTryRemove(pathInZooKeeper);
+
+               // Verify discarded
+               assertEquals(0, 
ZOOKEEPER.getClient().getChildren().forPath("/").size());
+       }
+
+       /**
+        * Tests that state handles are correctly removed with a callback.
+        */
+       @Test
+       public void testRemoveWithCallback() throws Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = "/testRemoveWithCallback";
+               final Long state = 27255442L;
+
+               store.addAndLock(pathInZooKeeper, state);
+
+               final CountDownLatch sync = new CountDownLatch(1);
+               ZooKeeperStateHandleStore.RemoveCallback<Long> callback = 
mock(ZooKeeperStateHandleStore.RemoveCallback.class);
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                               sync.countDown();
+                               return null;
+                       }
+               }).when(callback).apply(any(RetrievableStateHandle.class));
+
+               // Test
+               store.releaseAndTryRemove(pathInZooKeeper, callback);
+
+               // Verify discarded and callback called
+               assertEquals(0, 
ZOOKEEPER.getClient().getChildren().forPath("/").size());
+
+               sync.await();
+
+               verify(callback, times(1))
+                               .apply(any(RetrievableStateHandle.class));
+       }
+
+       /** Tests that all state handles are correctly discarded. */
+       @Test
+       public void testReleaseAndTryRemoveAll() throws Exception {
+               // Setup
+               LongStateStorage stateHandleProvider = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                               ZOOKEEPER.getClient(), stateHandleProvider, 
Executors.directExecutor());
+
+               // Config
+               final String pathInZooKeeper = "/testDiscardAll";
+
+               final Set<Long> expected = new HashSet<>();
+               expected.add(311222268470898L);
+               expected.add(132812888L);
+               expected.add(27255442L);
+               expected.add(11122233124L);
+
+               // Test
+               for (long val : expected) {
+                       store.addAndLock(pathInZooKeeper + val, val);
+               }
+
+               store.releaseAndTryRemoveAll();
+
+               // Verify all discarded
+               assertEquals(0, 
ZOOKEEPER.getClient().getChildren().forPath("/").size());
+       }
+
+       /**
+        * Tests that the ZooKeeperStateHandleStore can handle corrupted data 
by releasing and trying to remove the
+        * respective ZooKeeper ZNodes.
+        */
+       @Test
+       public void testCorruptedData() throws Exception {
+               LongStateStorage stateStorage = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(),
+                       stateStorage,
+                       Executors.directExecutor());
+
+               final Collection<Long> input = new HashSet<>();
+               input.add(1L);
+               input.add(2L);
+               input.add(3L);
+
+               for (Long aLong : input) {
+                       store.addAndLock("/" + aLong, aLong);
+               }
+
+               // corrupt one of the entries
+               ZOOKEEPER.getClient().setData().forPath("/" + 2, new byte[2]);
+
+               List<Tuple2<RetrievableStateHandle<Long>, String>> allEntries = 
store.getAllAndLock();
+
+               Collection<Long> expected = new HashSet<>(input);
+               expected.remove(2L);
+
+               Collection<Long> actual = new HashSet<>(expected.size());
+
+               for (Tuple2<RetrievableStateHandle<Long>, String> entry : 
allEntries) {
+                       actual.add(entry.f0.retrieveState());
+               }
+
+               assertEquals(expected, actual);
+
+               // check the same for the all sorted by name call
+               allEntries = store.getAllSortedByNameAndLock();
+
+               actual.clear();
+
+               for (Tuple2<RetrievableStateHandle<Long>, String> entry : 
allEntries) {
+                       actual.add(entry.f0.retrieveState());
+               }
+
+               assertEquals(expected, actual);
+
+               Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/" + 
2);
+
+               // check that the corrupted node no longer exists
+               assertNull("The corrupted node should no longer exist.", stat);
+       }
+
+       /**
+        * FLINK-6612
+        *
+        * Tests that a concurrent delete operation cannot succeed if another 
instance holds a lock on the specified
+        * node.
+        */
+       @Test
+       public void testConcurrentDeleteOperation() throws Exception {
+               LongStateStorage longStateStorage = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> zkStore1 = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(),
+                       longStateStorage,
+                       Executors.directExecutor());
+
+               ZooKeeperStateHandleStore<Long> zkStore2 = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(),
+                       longStateStorage,
+                       Executors.directExecutor());
+
+               final String statePath = "/state";
+
+               zkStore1.addAndLock(statePath, 42L);
+               RetrievableStateHandle<Long> stateHandle = 
zkStore2.getAndLock(statePath);
+
+               // this should not remove the referenced node because we are 
still holding a state handle
+               // reference via zkStore2
+               zkStore1.releaseAndTryRemove(statePath);
+
+               // sanity check
+               assertEquals(42L, (long) stateHandle.retrieveState());
+
+               Stat nodeStat = 
ZOOKEEPER.getClient().checkExists().forPath(statePath);
+
+               assertNotNull("NodeStat should not be null, otherwise the 
referenced node does not exist.", nodeStat);
+
+               zkStore2.releaseAndTryRemove(statePath);
+
+               nodeStat = 
ZOOKEEPER.getClient().checkExists().forPath(statePath);
+
+               assertNull("NodeState should be null, because the referenced 
node should no longer exist.", nodeStat);
+       }
+
+       /**
+        * FLINK-6612
+        *
+        * Tests that getAndLock removes a created lock if the 
RetrievableStateHandle cannot be retrieved
+        * (e.g. deserialization problem).
+        */
+       @Test
+       public void testLockCleanupWhenGetAndLockFails() throws Exception {
+               LongStateStorage longStateStorage = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> zkStore1 = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(),
+                       longStateStorage,
+                       Executors.directExecutor());
+
+               ZooKeeperStateHandleStore<Long> zkStore2 = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(),
+                       longStateStorage,
+                       Executors.directExecutor());
+
+               final String path = "/state";
+
+               zkStore1.addAndLock(path, 42L);
+
+               final byte[] corruptedData = {1, 2};
+
+               // corrupt the data
+               ZOOKEEPER.getClient().setData().forPath(path, corruptedData);
+
+               try {
+                       zkStore2.getAndLock(path);
+                       fail("Should fail because we cannot deserialize the 
node's data");
+               } catch (IOException ignored) {
+                       // expected to fail
+               }
+
+               // check that there is no lock node left
+               String lockNodePath = zkStore2.getLockPath(path);
+
+               Stat stat = 
ZOOKEEPER.getClient().checkExists().forPath(lockNodePath);
+
+               // zkStore2 should not have created a lock node
+               assertNull("zkStore2 should not have created a lock node.", 
stat);
+
+               Collection<String> children = 
ZOOKEEPER.getClient().getChildren().forPath(path);
+
+               // there should be exactly one lock node from zkStore1
+               assertEquals(1, children.size());
+
+               zkStore1.releaseAndTryRemove(path);
+
+               stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+               assertNull("The state node should have been removed.", stat);
+       }
+
+       /**
+        * FLINK-6612
+        *
+        * Tests that lock nodes will be released if the client dies.
+        */
+       @Test
+       public void testLockCleanupWhenClientTimesOut() throws Exception {
+               LongStateStorage longStateStorage = new LongStateStorage();
+
+               Configuration configuration = new Configuration();
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
ZOOKEEPER.getConnectString());
+               
configuration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 
100);
+               
configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_ROOT, "timeout");
+
+               try (CuratorFramework client = 
ZooKeeperUtils.startCuratorFramework(configuration);
+                       CuratorFramework client2 = 
ZooKeeperUtils.startCuratorFramework(configuration)) {
+
+                       ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
+                               client,
+                               longStateStorage,
+                               Executors.directExecutor());
+
+                       final String path = "/state";
+
+                       zkStore.addAndLock(path, 42L);
+
+                       // this should delete all ephemeral nodes
+                       client.close();
+
+                       Stat stat = client2.checkExists().forPath(path);
+
+                       // check that our state node still exists
+                       assertNotNull(stat);
+
+                       Collection<String> children = 
client2.getChildren().forPath(path);
+
+                       // check that the lock node has been released
+                       assertEquals(0, children.size());
+               }
+       }
+
+       /**
+        * FLINK-6612
+        *
+        * Tests that we can release a locked state handles in the 
ZooKeeperStateHandleStore.
+        */
+       @Test
+       public void testRelease() throws Exception {
+               LongStateStorage longStateStorage = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(),
+                       longStateStorage,
+                       Executors.directExecutor());
+
+               final String path = "/state";
+
+               zkStore.addAndLock(path, 42L);
+
+               final String lockPath = zkStore.getLockPath(path);
+
+               Stat stat = 
ZOOKEEPER.getClient().checkExists().forPath(lockPath);
+
+               assertNotNull("Expected an existing lock", stat);
+
+               zkStore.release(path);
+
+               stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+               // release should have removed the lock child
+               assertEquals("Expected no lock nodes as children", 0, 
stat.getNumChildren());
+
+               zkStore.releaseAndTryRemove(path);
+
+               stat = ZOOKEEPER.getClient().checkExists().forPath(path);
+
+               assertNull("State node should have been removed.",stat);
+       }
+
+       /**
+        * FLINK-6612
+        *
+        * Tests that we can release all locked state handles in the 
ZooKeeperStateHandleStore
+        */
+       @Test
+       public void testReleaseAll() throws Exception {
+               LongStateStorage longStateStorage = new LongStateStorage();
+
+               ZooKeeperStateHandleStore<Long> zkStore = new 
ZooKeeperStateHandleStore<>(
+                       ZOOKEEPER.getClient(),
+                       longStateStorage,
+                       Executors.directExecutor());
+
+               final Collection<String> paths = Arrays.asList("/state1", 
"/state2", "/state3");
+
+               for (String path : paths) {
+                       zkStore.addAndLock(path, 42L);
+               }
+
+               for (String path : paths) {
+                       Stat stat = 
ZOOKEEPER.getClient().checkExists().forPath(zkStore.getLockPath(path));
+
+                       assertNotNull("Expecte and existing lock.", stat);
+               }
+
+               zkStore.releaseAll();
+
+               for (String path : paths) {
+                       Stat stat = 
ZOOKEEPER.getClient().checkExists().forPath(path);
+
+                       assertEquals(0, stat.getNumChildren());
+               }
+
+               zkStore.releaseAndTryRemoveAll();
+
+               Stat stat = ZOOKEEPER.getClient().checkExists().forPath("/");
+
+               assertEquals(0, stat.getNumChildren());
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // Simple test helpers
+       // 
---------------------------------------------------------------------------------------------
+
+       private static class LongStateStorage implements 
RetrievableStateStorageHelper<Long> {
+
+               private final List<LongRetrievableStateHandle> stateHandles = 
new ArrayList<>();
+
+               @Override
+               public RetrievableStateHandle<Long> store(Long state) throws 
Exception {
+                       LongRetrievableStateHandle stateHandle = new 
LongRetrievableStateHandle(state);
+                       stateHandles.add(stateHandle);
+
+                       return stateHandle;
+               }
+
+               List<LongRetrievableStateHandle> getStateHandles() {
+                       return stateHandles;
+               }
+       }
+
+       private static class LongRetrievableStateHandle implements 
RetrievableStateHandle<Long> {
+
+               private static final long serialVersionUID = 
-3555329254423838912L;
+
+               private final Long state;
+
+               private int numberOfDiscardCalls;
+
+               public LongRetrievableStateHandle(Long state) {
+                       this.state = state;
+               }
+
+               @Override
+               public Long retrieveState() throws Exception {
+                       return state;
+               }
+
+               @Override
+               public void discardState() throws Exception {
+                       numberOfDiscardCalls++;
+               }
+
+               @Override
+               public long getStateSize() {
+                       return 0;
+               }
+
+               public int getNumberOfDiscardCalls() {
+                       return numberOfDiscardCalls;
+               }
+       }
+}

Reply via email to