Repository: hbase Updated Branches: refs/heads/branch-2 142e6bb9d -> 03cb58158
http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/pom.xml b/hbase-zookeeper/pom.xml index 7b2f8d6..ad2e481 100644 --- a/hbase-zookeeper/pom.xml +++ b/hbase-zookeeper/pom.xml @@ -62,20 +62,6 @@ <skipAssembly>true</skipAssembly> </configuration> </plugin> - <!-- Make a jar and put the sources in the jar --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>jar</goal> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> <!-- General plugins --> <plugin> <groupId>org.apache.maven.plugins</groupId> http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java new file mode 100644 index 0000000..fc31c37 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/HBaseZKTestingUtility.java @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Helpers for testing HBase that do not depend on specific server/etc. things. The main difference + * from {@link HBaseCommonTestingUtility} is that we can start a zookeeper cluster. + */ [email protected] +public class HBaseZKTestingUtility extends HBaseCommonTestingUtility { + + private MiniZooKeeperCluster zkCluster; + + /** + * Set if we were passed a zkCluster. If so, we won't shutdown zk as part of general shutdown. + */ + private boolean passedZkCluster; + + protected ZKWatcher zooKeeperWatcher; + + /** Directory (a subdirectory of dataTestDir) used by the dfs cluster if any */ + protected File clusterTestDir; + + public HBaseZKTestingUtility() { + this(HBaseConfiguration.create()); + } + + public HBaseZKTestingUtility(Configuration conf) { + super(conf); + } + + /** + * @return Where the cluster will write data on the local subsystem. Creates it if it does not + * exist already. A subdir of {@link #getBaseTestDir()} + * @see #getTestFileSystem() + */ + Path getClusterTestDir() { + if (clusterTestDir == null) { + setupClusterTestDir(); + } + return new Path(clusterTestDir.getAbsolutePath()); + } + + /** + * Creates a directory for the cluster, under the test data + */ + protected void setupClusterTestDir() { + if (clusterTestDir != null) { + return; + } + + // Using randomUUID ensures that multiple clusters can be launched by + // a same test, if it stops & starts them + Path testDir = getDataTestDir("cluster_" + UUID.randomUUID().toString()); + clusterTestDir = new File(testDir.toString()).getAbsoluteFile(); + // Have it cleaned up on exit + boolean b = deleteOnExit(); + if (b) { + clusterTestDir.deleteOnExit(); + } + LOG.info("Created new mini-cluster data directory: " + clusterTestDir + ", deleteOnExit=" + b); + } + + /** + * Call this if you only want a zk cluster. + * @see #shutdownMiniZKCluster() + * @return zk cluster started. + */ + public MiniZooKeeperCluster startMiniZKCluster() throws Exception { + return startMiniZKCluster(1); + } + + /** + * Call this if you only want a zk cluster. + * @see #shutdownMiniZKCluster() + * @return zk cluster started. + */ + public MiniZooKeeperCluster startMiniZKCluster(int zooKeeperServerNum, int... clientPortList) + throws Exception { + setupClusterTestDir(); + return startMiniZKCluster(clusterTestDir, zooKeeperServerNum, clientPortList); + } + + /** + * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set the + * port mentioned is used as the default port for ZooKeeper. + */ + private MiniZooKeeperCluster startMiniZKCluster(File dir, int zooKeeperServerNum, + int[] clientPortList) throws Exception { + if (this.zkCluster != null) { + throw new IOException("Cluster already running at " + dir); + } + this.passedZkCluster = false; + this.zkCluster = new MiniZooKeeperCluster(this.getConfiguration()); + int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0); + if (defPort > 0) { + // If there is a port in the config file, we use it. + this.zkCluster.setDefaultClientPort(defPort); + } + + if (clientPortList != null) { + // Ignore extra client ports + int clientPortListSize = (clientPortList.length <= zooKeeperServerNum) ? clientPortList.length + : zooKeeperServerNum; + for (int i = 0; i < clientPortListSize; i++) { + this.zkCluster.addClientPort(clientPortList[i]); + } + } + int clientPort = this.zkCluster.startup(dir, zooKeeperServerNum); + this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort)); + return this.zkCluster; + } + + public MiniZooKeeperCluster getZkCluster() { + return zkCluster; + } + + public void setZkCluster(MiniZooKeeperCluster zkCluster) { + this.passedZkCluster = true; + this.zkCluster = zkCluster; + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkCluster.getClientPort()); + } + + /** + * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)} or does nothing. + * @see #startMiniZKCluster() + */ + public void shutdownMiniZKCluster() throws IOException { + if (!passedZkCluster && this.zkCluster != null) { + this.zkCluster.shutdown(); + this.zkCluster = null; + } + } + + /** + * Returns a ZKWatcher instance. This instance is shared between HBaseTestingUtility instance + * users. Don't close it, it will be closed automatically when the cluster shutdowns + * @return The ZKWatcher instance. + */ + public synchronized ZKWatcher getZooKeeperWatcher() throws IOException { + if (zooKeeperWatcher == null) { + zooKeeperWatcher = new ZKWatcher(conf, "testing utility", new Abortable() { + @Override + public void abort(String why, Throwable e) { + throw new RuntimeException("Unexpected abort in HBaseZKTestingUtility:" + why, e); + } + + @Override + public boolean isAborted() { + return false; + } + }); + } + return zooKeeperWatcher; + } + + /** + * Gets a ZKWatcher. + */ + public static ZKWatcher getZooKeeperWatcher(HBaseZKTestingUtility testUtil) + throws ZooKeeperConnectionException, IOException { + ZKWatcher zkw = new ZKWatcher(testUtil.getConfiguration(), "unittest", new Abortable() { + boolean aborted = false; + + @Override + public void abort(String why, Throwable e) { + aborted = true; + throw new RuntimeException("Fatal ZK error, why=" + why, e); + } + + @Override + public boolean isAborted() { + return aborted; + } + }); + return zkw; + } + + /** + * @return True if we removed the test dirs + */ + @Override + public boolean cleanupTestDir() throws IOException { + boolean ret = super.cleanupTestDir(); + if (deleteDir(this.clusterTestDir)) { + this.clusterTestDir = null; + return ret & true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java new file mode 100644 index 0000000..89bb034 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test for HQuorumPeer. + */ +@Category({ ZKTests.class, MediumTests.class }) +public class TestHQuorumPeer { + private static final HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); + private static int PORT_NO = 21818; + private Path dataDir; + + + @Before public void setup() throws IOException { + // Set it to a non-standard port. + TEST_UTIL.getConfiguration().setInt(HConstants.ZOOKEEPER_CLIENT_PORT, + PORT_NO); + this.dataDir = TEST_UTIL.getDataTestDir(this.getClass().getName()); + FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration()); + if (fs.exists(this.dataDir)) { + if (!fs.delete(this.dataDir, true)) { + throw new IOException("Failed cleanup of " + this.dataDir); + } + } + if (!fs.mkdirs(this.dataDir)) { + throw new IOException("Failed create of " + this.dataDir); + } + } + + @Test public void testMakeZKProps() { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.set(HConstants.ZOOKEEPER_DATA_DIR, this.dataDir.toString()); + Properties properties = ZKConfig.makeZKProps(conf); + assertEquals(dataDir.toString(), (String)properties.get("dataDir")); + assertEquals(Integer.valueOf(PORT_NO), + Integer.valueOf(properties.getProperty("clientPort"))); + assertEquals("localhost:2888:3888", properties.get("server.0")); + assertEquals(null, properties.get("server.1")); + + String oldValue = conf.get(HConstants.ZOOKEEPER_QUORUM); + conf.set(HConstants.ZOOKEEPER_QUORUM, "a.foo.bar,b.foo.bar,c.foo.bar"); + properties = ZKConfig.makeZKProps(conf); + assertEquals(dataDir.toString(), properties.get("dataDir")); + assertEquals(Integer.valueOf(PORT_NO), + Integer.valueOf(properties.getProperty("clientPort"))); + assertEquals("a.foo.bar:2888:3888", properties.get("server.0")); + assertEquals("b.foo.bar:2888:3888", properties.get("server.1")); + assertEquals("c.foo.bar:2888:3888", properties.get("server.2")); + assertEquals(null, properties.get("server.3")); + conf.set(HConstants.ZOOKEEPER_QUORUM, oldValue); + } + + @Test public void testShouldAssignDefaultZookeeperClientPort() { + Configuration config = HBaseConfiguration.create(); + config.clear(); + Properties p = ZKConfig.makeZKProps(config); + assertNotNull(p); + assertEquals(2181, p.get("clientPort")); + } + + @Test + public void testGetZKQuorumServersString() { + Configuration config = new Configuration(TEST_UTIL.getConfiguration()); + config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, 8888); + config.set(HConstants.ZOOKEEPER_QUORUM, "foo:1234,bar:5678,baz,qux:9012"); + + String s = ZKConfig.getZKQuorumServersString(config); + assertEquals("foo:1234,bar:5678,baz:8888,qux:9012", s); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java index 765ddf9..1f83536 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestReadOnlyZKClient.java @@ -28,15 +28,14 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import java.io.File; import java.io.IOException; -import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.zookeeper.CreateMode; @@ -52,9 +51,7 @@ import org.junit.experimental.categories.Category; @Category({ ZKTests.class, MediumTests.class }) public class TestReadOnlyZKClient { - private static HBaseCommonTestingUtility UTIL = new HBaseCommonTestingUtility(); - - private static MiniZooKeeperCluster CLUSTER; + private static HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); private static int PORT; @@ -67,11 +64,9 @@ public class TestReadOnlyZKClient { private static ReadOnlyZKClient RO_ZK; @BeforeClass - public static void setUp() throws IOException, InterruptedException, KeeperException { - File file = - new File(UTIL.getDataTestDir("zkcluster_" + UUID.randomUUID().toString()).toString()); - CLUSTER = new MiniZooKeeperCluster(UTIL.getConfiguration()); - PORT = CLUSTER.startup(file); + public static void setUp() throws Exception { + PORT = UTIL.startMiniZKCluster().getClientPort(); + ZooKeeper zk = new ZooKeeper("localhost:" + PORT, 10000, e -> { }); DATA = new byte[10]; @@ -94,18 +89,28 @@ public class TestReadOnlyZKClient { @AfterClass public static void tearDown() throws IOException { RO_ZK.close(); - CLUSTER.shutdown(); + UTIL.shutdownMiniZKCluster(); UTIL.cleanupTestDir(); } @Test - public void testGetAndExists() throws InterruptedException, ExecutionException { + public void testGetAndExists() throws Exception { assertArrayEquals(DATA, RO_ZK.get(PATH).get()); assertEquals(CHILDREN, RO_ZK.exists(PATH).get().getNumChildren()); assertNotNull(RO_ZK.getZooKeeper()); - // a little longer than keep alive millis - Thread.sleep(5000); - assertNull(RO_ZK.getZooKeeper()); + // The zookeeper client should be closed finally after the keep alive time elapsed + UTIL.waitFor(10000, new ExplainingPredicate<Exception>() { + + @Override + public boolean evaluate() throws Exception { + return RO_ZK.getZooKeeper() == null; + } + + @Override + public String explainFailure() throws Exception { + return "Connection to zookeeper is still alive"; + } + }); } @Test @@ -129,7 +134,7 @@ public class TestReadOnlyZKClient { assertArrayEquals(DATA, RO_ZK.get(PATH).get()); ZooKeeper zk = RO_ZK.getZooKeeper(); long sessionId = zk.getSessionId(); - CLUSTER.getZooKeeperServers().get(0).closeSession(sessionId); + UTIL.getZkCluster().getZooKeeperServers().get(0).closeSession(sessionId); // should not reach keep alive so still the same instance assertSame(zk, RO_ZK.getZooKeeper()); http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java new file mode 100644 index 0000000..bcba906 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRecoverableZooKeeper.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.lang.reflect.Field; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ZKTests.class, MediumTests.class }) +public class TestRecoverableZooKeeper { + + private final static HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); + + Abortable abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + + } + + @Override + public boolean isAborted() { + return false; + } + }; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testSetDataVersionMismatchInLoop() throws Exception { + String znode = "/hbase/splitWAL/9af7cfc9b15910a0b3d714bf40a3248f"; + Configuration conf = TEST_UTIL.getConfiguration(); + ZKWatcher zkw = new ZKWatcher(conf, "testSetDataVersionMismatchInLoop", + abortable, true); + String ensemble = ZKConfig.getZKQuorumServersString(conf); + RecoverableZooKeeper rzk = ZKUtil.connect(conf, ensemble, zkw); + rzk.create(znode, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + rzk.setData(znode, "OPENING".getBytes(), 0); + Field zkField = RecoverableZooKeeper.class.getDeclaredField("zk"); + zkField.setAccessible(true); + int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + ZookeeperStub zkStub = new ZookeeperStub(ensemble, timeout, zkw); + zkStub.setThrowExceptionInNumOperations(1); + zkField.set(rzk, zkStub); + byte[] opened = "OPENED".getBytes(); + rzk.setData(znode, opened, 1); + byte[] data = rzk.getData(znode, false, new Stat()); + assertTrue(Bytes.equals(opened, data)); + } + + class ZookeeperStub extends ZooKeeper { + + private int throwExceptionInNumOperations; + + public ZookeeperStub(String connectString, int sessionTimeout, Watcher watcher) + throws IOException { + super(connectString, sessionTimeout, watcher); + } + + public void setThrowExceptionInNumOperations(int throwExceptionInNumOperations) { + this.throwExceptionInNumOperations = throwExceptionInNumOperations; + } + + private void checkThrowKeeperException() throws KeeperException { + if (throwExceptionInNumOperations == 1) { + throwExceptionInNumOperations = 0; + throw new KeeperException.ConnectionLossException(); + } + if (throwExceptionInNumOperations > 0) { + throwExceptionInNumOperations--; + } + } + + @Override + public Stat setData(String path, byte[] data, int version) throws KeeperException, + InterruptedException { + Stat stat = super.setData(path, data, version); + checkThrowKeeperException(); + return stat; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java new file mode 100644 index 0000000..fe282f5 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKLeaderManager.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ZKTests.class, MediumTests.class }) +public class TestZKLeaderManager { + private static final Log LOG = LogFactory.getLog(TestZKLeaderManager.class); + + private static final String LEADER_ZNODE = + "/test/" + TestZKLeaderManager.class.getSimpleName(); + + private static class MockAbortable implements Abortable { + private boolean aborted; + + @Override + public void abort(String why, Throwable e) { + aborted = true; + LOG.fatal("Aborting during test: "+why, e); + fail("Aborted during test: " + why); + } + + @Override + public boolean isAborted() { + return aborted; + } + } + + private static class MockLeader extends Thread implements Stoppable { + private boolean stopped; + private ZKWatcher watcher; + private ZKLeaderManager zkLeader; + private AtomicBoolean master = new AtomicBoolean(false); + private int index; + + public MockLeader(ZKWatcher watcher, int index) { + setDaemon(true); + setName("TestZKLeaderManager-leader-" + index); + this.index = index; + this.watcher = watcher; + this.zkLeader = new ZKLeaderManager(watcher, LEADER_ZNODE, + Bytes.toBytes(index), this); + } + + public boolean isMaster() { + return master.get(); + } + + public int getIndex() { + return index; + } + + public ZKWatcher getWatcher() { + return watcher; + } + + public void run() { + while (!stopped) { + zkLeader.start(); + zkLeader.waitToBecomeLeader(); + master.set(true); + + while (master.get() && !stopped) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) {} + } + } + } + + public void abdicate() { + zkLeader.stepDownAsLeader(); + master.set(false); + } + + @Override + public void stop(String why) { + stopped = true; + abdicate(); + watcher.close(); + } + + @Override + public boolean isStopped() { + return stopped; + } + } + + private static HBaseZKTestingUtility TEST_UTIL; + private static MockLeader[] CANDIDATES; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + TEST_UTIL = new HBaseZKTestingUtility(); + TEST_UTIL.startMiniZKCluster(); + Configuration conf = TEST_UTIL.getConfiguration(); + + // use an abortable to fail the test in the case of any KeeperExceptions + MockAbortable abortable = new MockAbortable(); + CANDIDATES = new MockLeader[3]; + for (int i = 0; i < 3; i++) { + ZKWatcher watcher = newZK(conf, "server"+i, abortable); + CANDIDATES[i] = new MockLeader(watcher, i); + CANDIDATES[i].start(); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test + public void testLeaderSelection() throws Exception { + MockLeader currentLeader = getCurrentLeader(); + // one leader should have been found + assertNotNull("Leader should exist", currentLeader); + LOG.debug("Current leader index is "+currentLeader.getIndex()); + + byte[] znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); + assertNotNull("Leader znode should contain leader index", znodeData); + assertTrue("Leader znode should not be empty", znodeData.length > 0); + int storedIndex = Bytes.toInt(znodeData); + LOG.debug("Stored leader index in ZK is "+storedIndex); + assertEquals("Leader znode should match leader index", + currentLeader.getIndex(), storedIndex); + + // force a leader transition + currentLeader.abdicate(); + assertFalse(currentLeader.isMaster()); + + // check for new leader + currentLeader = getCurrentLeader(); + // one leader should have been found + assertNotNull("New leader should exist after abdication", currentLeader); + LOG.debug("New leader index is "+currentLeader.getIndex()); + + znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); + assertNotNull("Leader znode should contain leader index", znodeData); + assertTrue("Leader znode should not be empty", znodeData.length > 0); + storedIndex = Bytes.toInt(znodeData); + LOG.debug("Stored leader index in ZK is "+storedIndex); + assertEquals("Leader znode should match leader index", + currentLeader.getIndex(), storedIndex); + + // force another transition by stopping the current + currentLeader.stop("Stopping for test"); + assertFalse(currentLeader.isMaster()); + + // check for new leader + currentLeader = getCurrentLeader(); + // one leader should have been found + assertNotNull("New leader should exist after stop", currentLeader); + LOG.debug("New leader index is "+currentLeader.getIndex()); + + znodeData = ZKUtil.getData(currentLeader.getWatcher(), LEADER_ZNODE); + assertNotNull("Leader znode should contain leader index", znodeData); + assertTrue("Leader znode should not be empty", znodeData.length > 0); + storedIndex = Bytes.toInt(znodeData); + LOG.debug("Stored leader index in ZK is "+storedIndex); + assertEquals("Leader znode should match leader index", + currentLeader.getIndex(), storedIndex); + + // with a second stop we can guarantee that a previous leader has resumed leading + currentLeader.stop("Stopping for test"); + assertFalse(currentLeader.isMaster()); + + // check for new + currentLeader = getCurrentLeader(); + assertNotNull("New leader should exist", currentLeader); + } + + private MockLeader getCurrentLeader() throws Exception { + MockLeader currentLeader = null; + outer: + // Wait up to 10 secs for initial leader + for (int i = 0; i < 1000; i++) { + for (int j = 0; j < CANDIDATES.length; j++) { + if (CANDIDATES[j].isMaster()) { + // should only be one leader + if (currentLeader != null) { + fail("Both candidate "+currentLeader.getIndex()+" and "+j+" claim to be leader!"); + } + currentLeader = CANDIDATES[j]; + } + } + if (currentLeader != null) { + break outer; + } + Thread.sleep(10); + } + return currentLeader; + } + + private static ZKWatcher newZK(Configuration conf, String name, + Abortable abort) throws Exception { + Configuration copy = HBaseConfiguration.create(conf); + ZKWatcher zk = new ZKWatcher(copy, name, abort); + return zk; + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMainServer.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMainServer.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMainServer.java new file mode 100644 index 0000000..bc1c240 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMainServer.java @@ -0,0 +1,119 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.security.Permission; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ZKTests.class, SmallTests.class }) +public class TestZKMainServer { + // ZKMS calls System.exit. Catch the call and prevent exit using trick described up in + // http://stackoverflow.com/questions/309396/java-how-to-test-methods-that-call-system-exit + protected static class ExitException extends SecurityException { + private static final long serialVersionUID = 1L; + public final int status; + public ExitException(int status) { + super("There is no escape!"); + this.status = status; + } + } + + private static class NoExitSecurityManager extends SecurityManager { + @Override + public void checkPermission(Permission perm) { + // allow anything. + } + + @Override + public void checkPermission(Permission perm, Object context) { + // allow anything. + } + + @Override + public void checkExit(int status) { + super.checkExit(status); + throw new ExitException(status); + } + } + + /** + * We need delete of a znode to work at least. + */ + @Test + public void testCommandLineWorks() throws Exception { + System.setSecurityManager(new NoExitSecurityManager()); + HBaseZKTestingUtility htu = new HBaseZKTestingUtility(); + htu.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 1000); + htu.startMiniZKCluster(); + try { + ZKWatcher zkw = htu.getZooKeeperWatcher(); + String znode = "/testCommandLineWorks"; + ZKUtil.createWithParents(zkw, znode, HConstants.EMPTY_BYTE_ARRAY); + ZKUtil.checkExists(zkw, znode); + boolean exception = false; + try { + ZKMainServer.main(new String [] {"-server", + "localhost:" + htu.getZkCluster().getClientPort(), "delete", znode}); + } catch (ExitException ee) { + // ZKMS calls System.exit which should trigger this exception. + exception = true; + } + assertTrue(exception); + assertEquals(-1, ZKUtil.checkExists(zkw, znode)); + } finally { + htu.shutdownMiniZKCluster(); + System.setSecurityManager(null); // or save and restore original + } + } + + @Test + public void testHostPortParse() { + ZKMainServer parser = new ZKMainServer(); + Configuration c = HBaseConfiguration.create(); + assertEquals("localhost:" + c.get(HConstants.ZOOKEEPER_CLIENT_PORT), parser.parse(c)); + final String port = "1234"; + c.set(HConstants.ZOOKEEPER_CLIENT_PORT, port); + c.set("hbase.zookeeper.quorum", "example.com"); + assertEquals("example.com:" + port, parser.parse(c)); + c.set("hbase.zookeeper.quorum", "example1.com,example2.com,example3.com"); + String ensemble = parser.parse(c); + assertTrue(port, ensemble.matches("(example[1-3]\\.com:1234,){2}example[1-3]\\.com:" + port)); + + // multiple servers with its own port + c.set("hbase.zookeeper.quorum", "example1.com:5678,example2.com:9012,example3.com:3456"); + ensemble = parser.parse(c); + assertEquals(ensemble, "example1.com:5678,example2.com:9012,example3.com:3456"); + + // some servers without its own port, which will be assigned the default client port + c.set("hbase.zookeeper.quorum", "example1.com:5678,example2.com:9012,example3.com"); + ensemble = parser.parse(c); + assertEquals(ensemble, "example1.com:5678,example2.com:9012,example3.com:" + port); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java new file mode 100644 index 0000000..3cc3815 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKMulti.java @@ -0,0 +1,390 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Op; +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test ZooKeeper multi-update functionality + */ +@Category({ ZKTests.class, MediumTests.class }) +public class TestZKMulti { + private static final Log LOG = LogFactory.getLog(TestZKMulti.class); + private final static HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); + private static ZKWatcher zkw = null; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + Configuration conf = TEST_UTIL.getConfiguration(); + Abortable abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + + @Override + public boolean isAborted() { + return false; + } + }; + zkw = new ZKWatcher(conf, + "TestZKMulti", abortable, true); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Test (timeout=60000) + public void testSimpleMulti() throws Exception { + // null multi + ZKUtil.multiOrSequential(zkw, null, false); + + // empty multi + ZKUtil.multiOrSequential(zkw, new LinkedList<>(), false); + + // single create + String path = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testSimpleMulti"); + LinkedList<ZKUtilOp> singleCreate = new LinkedList<>(); + singleCreate.add(ZKUtilOp.createAndFailSilent(path, new byte[0])); + ZKUtil.multiOrSequential(zkw, singleCreate, false); + assertTrue(ZKUtil.checkExists(zkw, path) != -1); + + // single setdata + LinkedList<ZKUtilOp> singleSetData = new LinkedList<>(); + byte [] data = Bytes.toBytes("foobar"); + singleSetData.add(ZKUtilOp.setData(path, data)); + ZKUtil.multiOrSequential(zkw, singleSetData, false); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path), data)); + + // single delete + LinkedList<ZKUtilOp> singleDelete = new LinkedList<>(); + singleDelete.add(ZKUtilOp.deleteNodeFailSilent(path)); + ZKUtil.multiOrSequential(zkw, singleDelete, false); + assertTrue(ZKUtil.checkExists(zkw, path) == -1); + } + + @Test (timeout=60000) + public void testComplexMulti() throws Exception { + String path1 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testComplexMulti1"); + String path2 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testComplexMulti2"); + String path3 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testComplexMulti3"); + String path4 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testComplexMulti4"); + String path5 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testComplexMulti5"); + String path6 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testComplexMulti6"); + // create 4 nodes that we'll setData on or delete later + LinkedList<ZKUtilOp> create4Nodes = new LinkedList<>(); + create4Nodes.add(ZKUtilOp.createAndFailSilent(path1, Bytes.toBytes(path1))); + create4Nodes.add(ZKUtilOp.createAndFailSilent(path2, Bytes.toBytes(path2))); + create4Nodes.add(ZKUtilOp.createAndFailSilent(path3, Bytes.toBytes(path3))); + create4Nodes.add(ZKUtilOp.createAndFailSilent(path4, Bytes.toBytes(path4))); + ZKUtil.multiOrSequential(zkw, create4Nodes, false); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path1), Bytes.toBytes(path1))); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path2), Bytes.toBytes(path2))); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path3), Bytes.toBytes(path3))); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path4), Bytes.toBytes(path4))); + + // do multiple of each operation (setData, delete, create) + LinkedList<ZKUtilOp> ops = new LinkedList<>(); + // setData + ops.add(ZKUtilOp.setData(path1, Bytes.add(Bytes.toBytes(path1), Bytes.toBytes(path1)))); + ops.add(ZKUtilOp.setData(path2, Bytes.add(Bytes.toBytes(path2), Bytes.toBytes(path2)))); + // delete + ops.add(ZKUtilOp.deleteNodeFailSilent(path3)); + ops.add(ZKUtilOp.deleteNodeFailSilent(path4)); + // create + ops.add(ZKUtilOp.createAndFailSilent(path5, Bytes.toBytes(path5))); + ops.add(ZKUtilOp.createAndFailSilent(path6, Bytes.toBytes(path6))); + ZKUtil.multiOrSequential(zkw, ops, false); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path1), + Bytes.add(Bytes.toBytes(path1), Bytes.toBytes(path1)))); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path2), + Bytes.add(Bytes.toBytes(path2), Bytes.toBytes(path2)))); + assertTrue(ZKUtil.checkExists(zkw, path3) == -1); + assertTrue(ZKUtil.checkExists(zkw, path4) == -1); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path5), Bytes.toBytes(path5))); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path6), Bytes.toBytes(path6))); + } + + @Test (timeout=60000) + public void testSingleFailure() throws Exception { + // try to delete a node that doesn't exist + boolean caughtNoNode = false; + String path = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testSingleFailureZ"); + LinkedList<ZKUtilOp> ops = new LinkedList<>(); + ops.add(ZKUtilOp.deleteNodeFailSilent(path)); + try { + ZKUtil.multiOrSequential(zkw, ops, false); + } catch (KeeperException.NoNodeException nne) { + caughtNoNode = true; + } + assertTrue(caughtNoNode); + + // try to setData on a node that doesn't exist + caughtNoNode = false; + ops = new LinkedList<>(); + ops.add(ZKUtilOp.setData(path, Bytes.toBytes(path))); + try { + ZKUtil.multiOrSequential(zkw, ops, false); + } catch (KeeperException.NoNodeException nne) { + caughtNoNode = true; + } + assertTrue(caughtNoNode); + + // try to create on a node that already exists + boolean caughtNodeExists = false; + ops = new LinkedList<>(); + ops.add(ZKUtilOp.createAndFailSilent(path, Bytes.toBytes(path))); + ZKUtil.multiOrSequential(zkw, ops, false); + try { + ZKUtil.multiOrSequential(zkw, ops, false); + } catch (KeeperException.NodeExistsException nee) { + caughtNodeExists = true; + } + assertTrue(caughtNodeExists); + } + + @Test (timeout=60000) + public void testSingleFailureInMulti() throws Exception { + // try a multi where all but one operation succeeds + String pathA = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testSingleFailureInMultiA"); + String pathB = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testSingleFailureInMultiB"); + String pathC = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testSingleFailureInMultiC"); + LinkedList<ZKUtilOp> ops = new LinkedList<>(); + ops.add(ZKUtilOp.createAndFailSilent(pathA, Bytes.toBytes(pathA))); + ops.add(ZKUtilOp.createAndFailSilent(pathB, Bytes.toBytes(pathB))); + ops.add(ZKUtilOp.deleteNodeFailSilent(pathC)); + boolean caughtNoNode = false; + try { + ZKUtil.multiOrSequential(zkw, ops, false); + } catch (KeeperException.NoNodeException nne) { + caughtNoNode = true; + } + assertTrue(caughtNoNode); + // assert that none of the operations succeeded + assertTrue(ZKUtil.checkExists(zkw, pathA) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathB) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathC) == -1); + } + + @Test (timeout=60000) + public void testMultiFailure() throws Exception { + String pathX = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testMultiFailureX"); + String pathY = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testMultiFailureY"); + String pathZ = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testMultiFailureZ"); + // create X that we will use to fail create later + LinkedList<ZKUtilOp> ops = new LinkedList<>(); + ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathX))); + ZKUtil.multiOrSequential(zkw, ops, false); + + // fail one of each create ,setData, delete + String pathV = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testMultiFailureV"); + String pathW = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "testMultiFailureW"); + ops = new LinkedList<>(); + ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathX))); // fail -- already exists + ops.add(ZKUtilOp.setData(pathY, Bytes.toBytes(pathY))); // fail -- doesn't exist + ops.add(ZKUtilOp.deleteNodeFailSilent(pathZ)); // fail -- doesn't exist + ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathV))); // pass + ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathW))); // pass + boolean caughtNodeExists = false; + try { + ZKUtil.multiOrSequential(zkw, ops, false); + } catch (KeeperException.NodeExistsException nee) { + // check first operation that fails throws exception + caughtNodeExists = true; + } + assertTrue(caughtNodeExists); + // check that no modifications were made + assertFalse(ZKUtil.checkExists(zkw, pathX) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathY) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathZ) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathW) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathV) == -1); + + // test that with multiple failures, throws an exception corresponding to first failure in list + ops = new LinkedList<>(); + ops.add(ZKUtilOp.setData(pathY, Bytes.toBytes(pathY))); // fail -- doesn't exist + ops.add(ZKUtilOp.createAndFailSilent(pathX, Bytes.toBytes(pathX))); // fail -- exists + boolean caughtNoNode = false; + try { + ZKUtil.multiOrSequential(zkw, ops, false); + } catch (KeeperException.NoNodeException nne) { + // check first operation that fails throws exception + caughtNoNode = true; + } + assertTrue(caughtNoNode); + // check that no modifications were made + assertFalse(ZKUtil.checkExists(zkw, pathX) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathY) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathZ) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathW) == -1); + assertTrue(ZKUtil.checkExists(zkw, pathV) == -1); + } + + @Test (timeout=60000) + public void testRunSequentialOnMultiFailure() throws Exception { + String path1 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "runSequential1"); + String path2 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "runSequential2"); + String path3 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "runSequential3"); + String path4 = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, "runSequential4"); + + // create some nodes that we will use later + LinkedList<ZKUtilOp> ops = new LinkedList<>(); + ops.add(ZKUtilOp.createAndFailSilent(path1, Bytes.toBytes(path1))); + ops.add(ZKUtilOp.createAndFailSilent(path2, Bytes.toBytes(path2))); + ZKUtil.multiOrSequential(zkw, ops, false); + + // test that, even with operations that fail, the ones that would pass will pass + // with runSequentialOnMultiFailure + ops = new LinkedList<>(); + ops.add(ZKUtilOp.setData(path1, Bytes.add(Bytes.toBytes(path1), Bytes.toBytes(path1)))); // pass + ops.add(ZKUtilOp.deleteNodeFailSilent(path2)); // pass + ops.add(ZKUtilOp.deleteNodeFailSilent(path3)); // fail -- node doesn't exist + ops.add(ZKUtilOp.createAndFailSilent(path4, + Bytes.add(Bytes.toBytes(path4), Bytes.toBytes(path4)))); // pass + ZKUtil.multiOrSequential(zkw, ops, true); + assertTrue(Bytes.equals(ZKUtil.getData(zkw, path1), + Bytes.add(Bytes.toBytes(path1), Bytes.toBytes(path1)))); + assertTrue(ZKUtil.checkExists(zkw, path2) == -1); + assertTrue(ZKUtil.checkExists(zkw, path3) == -1); + assertFalse(ZKUtil.checkExists(zkw, path4) == -1); + } + + /** + * Verifies that for the given root node, it should delete all the child nodes + * recursively using multi-update api. + */ + @Test (timeout=60000) + public void testdeleteChildrenRecursivelyMulti() throws Exception { + String parentZNode = "/testRootMulti"; + createZNodeTree(parentZNode); + + ZKUtil.deleteChildrenRecursivelyMultiOrSequential(zkw, true, parentZNode); + + assertTrue("Wrongly deleted parent znode!", + ZKUtil.checkExists(zkw, parentZNode) > -1); + List<String> children = zkw.getRecoverableZooKeeper().getChildren( + parentZNode, false); + assertTrue("Failed to delete child znodes!", 0 == children.size()); + } + + /** + * Verifies that for the given root node, it should delete all the nodes recursively using + * multi-update api. + */ + @Test(timeout = 60000) + public void testDeleteNodeRecursivelyMulti() throws Exception { + String parentZNode = "/testdeleteNodeRecursivelyMulti"; + createZNodeTree(parentZNode); + + ZKUtil.deleteNodeRecursively(zkw, parentZNode); + assertTrue("Parent znode should be deleted.", ZKUtil.checkExists(zkw, parentZNode) == -1); + } + + @Test(timeout = 60000) + public void testDeleteNodeRecursivelyMultiOrSequential() throws Exception { + String parentZNode1 = "/testdeleteNode1"; + String parentZNode2 = "/testdeleteNode2"; + String parentZNode3 = "/testdeleteNode3"; + createZNodeTree(parentZNode1); + createZNodeTree(parentZNode2); + createZNodeTree(parentZNode3); + + ZKUtil.deleteNodeRecursivelyMultiOrSequential(zkw, false, parentZNode1, parentZNode2, + parentZNode3); + assertTrue("Parent znode 1 should be deleted.", ZKUtil.checkExists(zkw, parentZNode1) == -1); + assertTrue("Parent znode 2 should be deleted.", ZKUtil.checkExists(zkw, parentZNode2) == -1); + assertTrue("Parent znode 3 should be deleted.", ZKUtil.checkExists(zkw, parentZNode3) == -1); + } + + @Test(timeout = 60000) + public void testDeleteChildrenRecursivelyMultiOrSequential() throws Exception { + String parentZNode1 = "/testdeleteChildren1"; + String parentZNode2 = "/testdeleteChildren2"; + String parentZNode3 = "/testdeleteChildren3"; + createZNodeTree(parentZNode1); + createZNodeTree(parentZNode2); + createZNodeTree(parentZNode3); + + ZKUtil.deleteChildrenRecursivelyMultiOrSequential(zkw, true, parentZNode1, parentZNode2, + parentZNode3); + + assertTrue("Wrongly deleted parent znode 1!", ZKUtil.checkExists(zkw, parentZNode1) > -1); + List<String> children = zkw.getRecoverableZooKeeper().getChildren(parentZNode1, false); + assertTrue("Failed to delete child znodes of parent znode 1!", 0 == children.size()); + + assertTrue("Wrongly deleted parent znode 2!", ZKUtil.checkExists(zkw, parentZNode2) > -1); + children = zkw.getRecoverableZooKeeper().getChildren(parentZNode2, false); + assertTrue("Failed to delete child znodes of parent znode 1!", 0 == children.size()); + + assertTrue("Wrongly deleted parent znode 3!", ZKUtil.checkExists(zkw, parentZNode3) > -1); + children = zkw.getRecoverableZooKeeper().getChildren(parentZNode3, false); + assertTrue("Failed to delete child znodes of parent znode 1!", 0 == children.size()); + } + + private void createZNodeTree(String rootZNode) throws KeeperException, + InterruptedException { + List<Op> opList = new ArrayList<>(); + opList.add(Op.create(rootZNode, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT)); + int level = 0; + String parentZNode = rootZNode; + while (level < 10) { + // define parent node + parentZNode = parentZNode + "/" + level; + opList.add(Op.create(parentZNode, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT)); + int elements = 0; + // add elements to the parent node + while (elements < level) { + opList.add(Op.create(parentZNode + "/" + elements, new byte[0], + Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); + elements++; + } + level++; + } + zkw.getRecoverableZooKeeper().multi(opList); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKNodeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKNodeTracker.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKNodeTracker.java new file mode 100644 index 0000000..f8aa7c3 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKNodeTracker.java @@ -0,0 +1,341 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseZKTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ZKTests.class, MediumTests.class }) +public class TestZKNodeTracker { + private static final Log LOG = LogFactory.getLog(TestZKNodeTracker.class); + private final static HBaseZKTestingUtility TEST_UTIL = new HBaseZKTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + /** + * Test that we can interrupt a node that is blocked on a wait. + */ + @Test + public void testInterruptible() throws IOException, InterruptedException { + Abortable abortable = new StubAbortable(); + ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), "testInterruptible", abortable); + final TestTracker tracker = new TestTracker(zk, "/xyz", abortable); + tracker.start(); + Thread t = new Thread() { + @Override + public void run() { + try { + tracker.blockUntilAvailable(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted", e); + } + } + }; + t.start(); + while (!t.isAlive()) { + Threads.sleep(1); + } + tracker.stop(); + t.join(); + // If it wasn't interruptible, we'd never get to here. + } + + @Test + public void testNodeTracker() throws Exception { + Abortable abortable = new StubAbortable(); + ZKWatcher zk = new ZKWatcher(TEST_UTIL.getConfiguration(), + "testNodeTracker", abortable); + ZKUtil.createAndFailSilent(zk, zk.znodePaths.baseZNode); + + final String node = ZNodePaths.joinZNode(zk.znodePaths.baseZNode, + Long.toString(ThreadLocalRandom.current().nextLong())); + + final byte [] dataOne = Bytes.toBytes("dataOne"); + final byte [] dataTwo = Bytes.toBytes("dataTwo"); + + // Start a ZKNT with no node currently available + TestTracker localTracker = new TestTracker(zk, node, abortable); + localTracker.start(); + zk.registerListener(localTracker); + + // Make sure we don't have a node + assertNull(localTracker.getData(false)); + + // Spin up a thread with another ZKNT and have it block + WaitToGetDataThread thread = new WaitToGetDataThread(zk, node); + thread.start(); + + // Verify the thread doesn't have a node + assertFalse(thread.hasData); + + // Now, start a new ZKNT with the node already available + TestTracker secondTracker = new TestTracker(zk, node, null); + secondTracker.start(); + zk.registerListener(secondTracker); + + // Put up an additional zk listener so we know when zk event is done + TestingZKListener zkListener = new TestingZKListener(zk, node); + zk.registerListener(zkListener); + assertEquals(0, zkListener.createdLock.availablePermits()); + + // Create a completely separate zk connection for test triggers and avoid + // any weird watcher interactions from the test + final ZooKeeper zkconn = + new ZooKeeper(ZKConfig.getZKQuorumServersString(TEST_UTIL.getConfiguration()), 60000, e -> { + }); + + // Add the node with data one + zkconn.create(node, dataOne, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Wait for the zk event to be processed + zkListener.waitForCreation(); + thread.join(); + + // Both trackers should have the node available with data one + assertNotNull(localTracker.getData(false)); + assertNotNull(localTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(localTracker.getData(false), dataOne)); + assertTrue(thread.hasData); + assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne)); + LOG.info("Successfully got data one"); + + // Make sure it's available and with the expected data + assertNotNull(secondTracker.getData(false)); + assertNotNull(secondTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(secondTracker.getData(false), dataOne)); + LOG.info("Successfully got data one with the second tracker"); + + // Drop the node + zkconn.delete(node, -1); + zkListener.waitForDeletion(); + + // Create a new thread but with the existing thread's tracker to wait + TestTracker threadTracker = thread.tracker; + thread = new WaitToGetDataThread(zk, node, threadTracker); + thread.start(); + + // Verify other guys don't have data + assertFalse(thread.hasData); + assertNull(secondTracker.getData(false)); + assertNull(localTracker.getData(false)); + LOG.info("Successfully made unavailable"); + + // Create with second data + zkconn.create(node, dataTwo, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Wait for the zk event to be processed + zkListener.waitForCreation(); + thread.join(); + + // All trackers should have the node available with data two + assertNotNull(localTracker.getData(false)); + assertNotNull(localTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(localTracker.getData(false), dataTwo)); + assertNotNull(secondTracker.getData(false)); + assertNotNull(secondTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(secondTracker.getData(false), dataTwo)); + assertTrue(thread.hasData); + assertTrue(Bytes.equals(thread.tracker.getData(false), dataTwo)); + LOG.info("Successfully got data two on all trackers and threads"); + + // Change the data back to data one + zkconn.setData(node, dataOne, -1); + + // Wait for zk event to be processed + zkListener.waitForDataChange(); + + // All trackers should have the node available with data one + assertNotNull(localTracker.getData(false)); + assertNotNull(localTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(localTracker.getData(false), dataOne)); + assertNotNull(secondTracker.getData(false)); + assertNotNull(secondTracker.blockUntilAvailable()); + assertTrue(Bytes.equals(secondTracker.getData(false), dataOne)); + assertTrue(thread.hasData); + assertTrue(Bytes.equals(thread.tracker.getData(false), dataOne)); + LOG.info("Successfully got data one following a data change on all trackers and threads"); + } + + public static class WaitToGetDataThread extends Thread { + + TestTracker tracker; + boolean hasData; + + public WaitToGetDataThread(ZKWatcher zk, String node) { + tracker = new TestTracker(zk, node, null); + tracker.start(); + zk.registerListener(tracker); + hasData = false; + } + + public WaitToGetDataThread(ZKWatcher zk, String node, + TestTracker tracker) { + this.tracker = tracker; + hasData = false; + } + + @Override + public void run() { + LOG.info("Waiting for data to be available in WaitToGetDataThread"); + try { + tracker.blockUntilAvailable(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + LOG.info("Data now available in tracker from WaitToGetDataThread"); + hasData = true; + } + } + + public static class TestTracker extends ZKNodeTracker { + public TestTracker(ZKWatcher watcher, String node, + Abortable abortable) { + super(watcher, node, abortable); + } + } + + public static class TestingZKListener extends ZKListener { + private static final Log LOG = LogFactory.getLog(TestingZKListener.class); + + private Semaphore deletedLock; + private Semaphore createdLock; + private Semaphore changedLock; + private String node; + + public TestingZKListener(ZKWatcher watcher, String node) { + super(watcher); + deletedLock = new Semaphore(0); + createdLock = new Semaphore(0); + changedLock = new Semaphore(0); + this.node = node; + } + + @Override + public void nodeDeleted(String path) { + if(path.equals(node)) { + LOG.debug("nodeDeleted(" + path + ")"); + deletedLock.release(); + } + } + + @Override + public void nodeCreated(String path) { + if(path.equals(node)) { + LOG.debug("nodeCreated(" + path + ")"); + createdLock.release(); + } + } + + @Override + public void nodeDataChanged(String path) { + if(path.equals(node)) { + LOG.debug("nodeDataChanged(" + path + ")"); + changedLock.release(); + } + } + + public void waitForDeletion() throws InterruptedException { + deletedLock.acquire(); + } + + public void waitForCreation() throws InterruptedException { + createdLock.acquire(); + } + + public void waitForDataChange() throws InterruptedException { + changedLock.acquire(); + } + } + + public static class StubAbortable implements Abortable { + @Override + public void abort(final String msg, final Throwable t) {} + + @Override + public boolean isAborted() { + return false; + } + } + + @Test + public void testCleanZNode() throws Exception { + ZKWatcher zkw = new ZKWatcher(TEST_UTIL.getConfiguration(), + "testNodeTracker", new TestZKNodeTracker.StubAbortable()); + + final ServerName sn = ServerName.valueOf("127.0.0.1:52", 45L); + + ZKUtil.createAndFailSilent(zkw, + TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); + + final String nodeName = zkw.znodePaths.masterAddressZNode; + + // Check that we manage the case when there is no data + ZKUtil.createAndFailSilent(zkw, nodeName); + MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); + assertNotNull(ZKUtil.getData(zkw, nodeName)); + + // Check that we don't delete if we're not supposed to + ZKUtil.setData(zkw, nodeName, MasterAddressTracker.toByteArray(sn, 0)); + MasterAddressTracker.deleteIfEquals(zkw, ServerName.valueOf("127.0.0.2:52", 45L).toString()); + assertNotNull(ZKUtil.getData(zkw, nodeName)); + + // Check that we delete when we're supposed to + ZKUtil.setData(zkw, nodeName,MasterAddressTracker.toByteArray(sn, 0)); + MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); + assertNull(ZKUtil.getData(zkw, nodeName)); + + // Check that we support the case when the znode does not exist + MasterAddressTracker.deleteIfEquals(zkw, sn.toString()); // must not throw an exception + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/03cb5815/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0f19a30..7b026c5 100644 --- a/pom.xml +++ b/pom.xml @@ -702,6 +702,7 @@ <phase>prepare-package</phase> <goals> <goal>jar-no-fork</goal> + <goal>test-jar-no-fork</goal> </goals> </execution> </executions> @@ -1689,6 +1690,13 @@ <groupId>org.apache.hbase</groupId> <version>${project.version}</version> </dependency> + <dependency> + <artifactId>hbase-zookeeper</artifactId> + <groupId>org.apache.hbase</groupId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> <!-- General dependencies --> <dependency> <groupId>com.github.stephenc.findbugs</groupId>
