Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/447#discussion_r161410077 --- Diff: src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeTest.java --- @@ -0,0 +1,516 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.concurrent.ConcurrentHashMap; +import javax.security.sasl.SaslException; + +import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.ByteBufferInputStream; +import org.apache.zookeeper.server.Request; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.zookeeper.test.QuorumBase; +import org.apache.zookeeper.test.DisconnectableZooKeeper; + +/** + * Tests that session upgrade works from local to global sessions. + * Expected behavior is that if global-only sessions are unset, + * and no upgrade interval is specified, then sessions will be + * created locally to the host. They will be upgraded to global + * sessions iff an operation is done on that session which requires + * persistence, i.e. creating an ephemeral node. + */ +public class SessionUpgradeTest extends QuorumPeerTestBase { + protected static final Logger LOG = LoggerFactory.getLogger(SessionUpgradeTest.class); + public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT; + + private final QuorumBase qb = new QuorumBase(); + + @Before + public void setUp() throws Exception { + LOG.info("STARTING quorum " + getClass().getName()); + qb.localSessionsEnabled = true; + qb.localSessionsUpgradingEnabled = true; + qb.setUp(); + ClientBase.waitForServerUp(qb.hostPort, 10000); + } + + @After + public void tearDown() throws Exception { + LOG.info("STOPPING quorum " + getClass().getName()); + qb.tearDown(); + } + + @Test + public void testLocalSessionsWithoutEphemeralOnFollower() throws Exception { + testLocalSessionsWithoutEphemeral(false); + } + + @Test + public void testLocalSessionsWithoutEphemeralOnLeader() throws Exception { + testLocalSessionsWithoutEphemeral(true); + } + + private void testLocalSessionsWithoutEphemeral(boolean testLeader) + throws Exception { + String nodePrefix = "/testLocalSessions-" + + (testLeader ? "leaderTest-" : "followerTest-"); + int leaderIdx = qb.getLeaderIndex(); + Assert.assertFalse("No leader in quorum?", leaderIdx == -1); + int followerIdx = (leaderIdx + 1) % 5; + int otherFollowerIdx = (leaderIdx + 2) % 5; + int testPeerIdx = testLeader ? leaderIdx : followerIdx; + String hostPorts[] = qb.hostPort.split(","); + CountdownWatcher watcher = new CountdownWatcher(); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // Try creating some data. + for (int i = 0; i < 5; i++) { + zk.create(nodePrefix + i, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + + long localSessionId = zk.getSessionId(); + byte[] localSessionPwd = zk.getSessionPasswd().clone(); + + // Try connecting with the same session id on a different + // server. This should fail since it is a local sesion. + try { + watcher.reset(); + DisconnectableZooKeeper zknew = new DisconnectableZooKeeper( + hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + + zknew.create(nodePrefix + "5", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.fail("Connection on the same session ID should fail."); + } catch (KeeperException.SessionExpiredException e) { + } catch (KeeperException.ConnectionLossException e) { + } + + // If we're testing a follower, also check the session id on the + // leader. This should also fail + if (!testLeader) { + try { + watcher.reset(); + DisconnectableZooKeeper zknew = new DisconnectableZooKeeper( + hostPorts[leaderIdx], CONNECTION_TIMEOUT, + watcher, localSessionId, localSessionPwd); + + zknew.create(nodePrefix + "5", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Assert.fail("Connection on the same session ID should fail."); + } catch (KeeperException.SessionExpiredException e) { + } catch (KeeperException.ConnectionLossException e) { + } + } + + // However, we should be able to disconnect and reconnect to the same + // server with the same session id (as long as we do it quickly + // before expiration). + zk.disconnect(); + + watcher.reset(); + zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + zk.create(nodePrefix + "6", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // If we explicitly close the session, then the session id should no + // longer be valid. + zk.close(); + try { + watcher.reset(); + zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + + zk.create(nodePrefix + "7", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Assert.fail("Reconnecting to a closed session ID should fail."); + } catch (KeeperException.SessionExpiredException e) { + } + } + + @Test + public void testUpgradeWithEphemeralOnFollower() throws Exception { + testUpgradeWithEphemeral(false); + } + + @Test + public void testUpgradeWithEphemeralOnLeader() throws Exception { + testUpgradeWithEphemeral(true); + } + + private void testUpgradeWithEphemeral(boolean testLeader) + throws Exception { + String nodePrefix = "/testUpgrade-" + + (testLeader ? "leaderTest-" : "followerTest-"); + int leaderIdx = qb.getLeaderIndex(); + Assert.assertFalse("No leader in quorum?", leaderIdx == -1); + int followerIdx = (leaderIdx + 1) % 5; + int otherFollowerIdx = (leaderIdx + 2) % 5; + int testPeerIdx = testLeader ? leaderIdx : followerIdx; + String hostPorts[] = qb.hostPort.split(","); + + CountdownWatcher watcher = new CountdownWatcher(); + DisconnectableZooKeeper zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // Create some ephemeral nodes. This should force the session to + // be propagated to the other servers in the ensemble. + for (int i = 0; i < 5; i++) { + zk.create(nodePrefix + i, new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); + } + + // We should be able to reconnect with the same session id on a + // different server, since it has been propagated. + long localSessionId = zk.getSessionId(); + byte[] localSessionPwd = zk.getSessionPasswd().clone(); + + zk.disconnect(); + watcher.reset(); + zk = new DisconnectableZooKeeper( + hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + watcher.waitForConnected(CONNECTION_TIMEOUT); + + // The created ephemeral nodes are still around. + for (int i = 0; i < 5; i++) { + Assert.assertNotNull(zk.exists(nodePrefix + i, null)); + } + + // When we explicitly close the session, we should not be able to + // reconnect with the same session id + zk.close(); + + try { + watcher.reset(); + zk = new DisconnectableZooKeeper( + hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher, + localSessionId, localSessionPwd); + zk.exists(nodePrefix + "0", null); + Assert.fail("Reconnecting to a closed session ID should fail."); + } catch (KeeperException.SessionExpiredException e) { + } + + watcher.reset(); + // And the ephemeral nodes will be gone since the session died. + zk = new DisconnectableZooKeeper( + hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher); + watcher.waitForConnected(CONNECTION_TIMEOUT); + for (int i = 0; i < 5; i++) { + Assert.assertNull(zk.exists(nodePrefix + i, null)); + } + } + + @Test + public void testLocalSessionUpgradeSnapshot() throws IOException, InterruptedException { + // setup the env with RetainDB and local session upgrading + ClientBase.setupTestEnv(); + + final int SERVER_COUNT = 3; + final int clientPorts[] = new int[SERVER_COUNT]; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < SERVER_COUNT; i++) { + clientPorts[i] = PortAssignment.unique(); + sb.append("server.").append(i).append("=127.0.0.1:") + .append(PortAssignment.unique()).append(":") + .append(PortAssignment.unique()).append("\n"); + } + sb.append("localSessionsEnabled=true\n"); + sb.append("localSessionsUpgradingEnabled=true\n"); + String cfg = sb.toString(); + + // create a 3 server ensemble + MainThread mt[] = new MainThread[SERVER_COUNT]; + final TestQPMainDropSessionUpgrading qpMain[] = + new TestQPMainDropSessionUpgrading[SERVER_COUNT]; + for (int i = 0; i < SERVER_COUNT; i++) { + final TestQPMainDropSessionUpgrading qp = new TestQPMainDropSessionUpgrading(); + qpMain[i] = qp; + mt[i] = new MainThread(i, clientPorts[i], cfg, false) { + @Override + public TestQPMain getTestQPMain() { + return qp; + } + }; + mt[i].start(); + } + + for (int i = 0; i < SERVER_COUNT; i++) { + Assert.assertTrue("waiting for server " + i + " being up", + ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], + CONNECTION_TIMEOUT)); + } --- End diff -- Test initialization logic is better placed in setUp() method especially when it's common in multiple tests. (That would be one benefit of moving new tests to new file)
---