Repository: zookeeper
Updated Branches:
  refs/heads/master b0df8fe1e -> cd209456b


ZOOKEEPER-2926: Fix potential data consistency issue due to the session 
management bug

Author: Fangmin Lyu <allen...@fb.com>

Reviewers: Michael Han <h...@apache.org>, Andor Molnar <an...@cloudera.com>

Closes #447 from lvfangmin/ZOOKEEPER-2926


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/cd209456
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/cd209456
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/cd209456

Branch: refs/heads/master
Commit: cd209456b67cde5aba771b1a240ebe5607398459
Parents: b0df8fe
Author: Fangmin Lyu <allen...@fb.com>
Authored: Tue Aug 7 21:21:37 2018 -0700
Committer: Michael Han <h...@apache.org>
Committed: Tue Aug 7 21:21:37 2018 -0700

----------------------------------------------------------------------
 .../zookeeper/server/PrepRequestProcessor.java  |  71 ++---
 .../org/apache/zookeeper/server/ServerCnxn.java |   2 +-
 .../apache/zookeeper/server/SessionTracker.java |  11 +-
 .../zookeeper/server/SessionTrackerImpl.java    |  17 +-
 .../zookeeper/server/ZooKeeperServer.java       |   9 +-
 .../server/quorum/LeaderSessionTracker.java     |  54 ++--
 .../server/quorum/LearnerSessionTracker.java    |  54 ++--
 .../server/quorum/LocalSessionTracker.java      |   6 +-
 .../quorum/UpgradeableSessionTracker.java       |  31 +-
 .../apache/zookeeper/server/MockServerCnxn.java |   4 +-
 .../server/PrepRequestProcessorTest.java        |   8 +-
 .../zookeeper/server/SessionTrackerTest.java    |   6 +-
 .../server/quorum/SessionUpgradeQuorumTest.java | 298 +++++++++++++++++++
 .../org/apache/zookeeper/test/ClientBase.java   |  10 +-
 .../org/apache/zookeeper/test/QuorumBase.java   |  60 ++--
 .../zookeeper/test/SessionTrackerCheckTest.java |  15 +-
 16 files changed, 497 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java 
b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
index b70ad18..eebc86b 100644
--- a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -436,7 +436,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 }
 
                 zks.sessionTracker.checkSession(request.sessionId, 
request.getOwner());
-                ReconfigRequest reconfigRequest = (ReconfigRequest)record; 
+                ReconfigRequest reconfigRequest = (ReconfigRequest)record;
                 LeaderZooKeeperServer lzks;
                 try {
                     lzks = (LeaderZooKeeperServer)zks;
@@ -444,13 +444,13 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                     // standalone mode - reconfiguration currently not 
supported
                     throw new KeeperException.UnimplementedException();
                 }
-                QuorumVerifier lastSeenQV = 
lzks.self.getLastSeenQuorumVerifier();                                          
                                       
+                QuorumVerifier lastSeenQV = 
lzks.self.getLastSeenQuorumVerifier();
                 // check that there's no reconfig in progress
                 if 
(lastSeenQV.getVersion()!=lzks.self.getQuorumVerifier().getVersion()) {
-                       throw new KeeperException.ReconfigInProgress(); 
+                       throw new KeeperException.ReconfigInProgress();
                 }
                 long configId = reconfigRequest.getCurConfigId();
-  
+
                 if (configId != -1 && 
configId!=lzks.self.getLastSeenQuorumVerifier().getVersion()){
                    String msg = "Reconfiguration from version " + configId + " 
failed -- last seen version is " +
                            lzks.self.getLastSeenQuorumVerifier().getVersion();
@@ -458,54 +458,54 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 }
 
                 String newMembers = reconfigRequest.getNewMembers();
-                
-                if (newMembers != null) { //non-incremental membership change  
                
+
+                if (newMembers != null) { //non-incremental membership change
                    LOG.info("Non-incremental reconfig");
-                
+
                    // Input may be delimited by either commas or newlines so 
convert to common newline separated format
                    newMembers = newMembers.replaceAll(",", "\n");
-                   
+
                    try{
-                       Properties props = new Properties();                    
    
+                       Properties props = new Properties();
                        props.load(new StringReader(newMembers));
                        request.qv = QuorumPeerConfig.parseDynamicConfig(props, 
lzks.self.getElectionType(), true, false);
                        request.qv.setVersion(request.getHdr().getZxid());
                    } catch (IOException | ConfigException e) {
                        throw new 
KeeperException.BadArgumentsException(e.getMessage());
                    }
-                } else { //incremental change - must be a majority quorum 
system   
+                } else { //incremental change - must be a majority quorum 
system
                    LOG.info("Incremental reconfig");
-                   
-                   List<String> joiningServers = null; 
+
+                   List<String> joiningServers = null;
                    String joiningServersString = 
reconfigRequest.getJoiningServers();
                    if (joiningServersString != null)
                    {
                        joiningServers = 
StringUtils.split(joiningServersString,",");
                    }
-                   
+
                    List<String> leavingServers = null;
                    String leavingServersString = 
reconfigRequest.getLeavingServers();
                    if (leavingServersString != null)
                    {
                        leavingServers = 
StringUtils.split(leavingServersString, ",");
                    }
-                   
+
                    if (!(lastSeenQV instanceof QuorumMaj)) {
                            String msg = "Incremental reconfiguration requested 
but last configuration seen has a non-majority quorum system";
                            LOG.warn(msg);
-                           throw new 
KeeperException.BadArgumentsException(msg);               
+                           throw new 
KeeperException.BadArgumentsException(msg);
                    }
                    Map<Long, QuorumServer> nextServers = new HashMap<Long, 
QuorumServer>(lastSeenQV.getAllMembers());
-                   try {                           
+                   try {
                        if (leavingServers != null) {
                            for (String leaving: leavingServers){
                                long sid = Long.parseLong(leaving);
                                nextServers.remove(sid);
-                           } 
+                           }
                        }
                        if (joiningServers != null) {
                            for (String joiner: joiningServers){
-                                  // joiner should have the following format: 
server.x = server_spec;client_spec               
+                                  // joiner should have the following format: 
server.x = server_spec;client_spec
                                   String[] parts = StringUtils.split(joiner, 
"=").toArray(new String[0]);
                                if (parts.length != 2) {
                                    throw new 
KeeperException.BadArgumentsException("Wrong format of server string");
@@ -514,7 +514,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                                Long sid = 
Long.parseLong(parts[0].substring(parts[0].lastIndexOf('.') + 1));
                                QuorumServer qs = new QuorumServer(sid, 
parts[1]);
                                if (qs.clientAddr == null || qs.electionAddr == 
null || qs.addr == null) {
-                                   throw new 
KeeperException.BadArgumentsException("Wrong format of server string - each 
server should have 3 ports specified");          
+                                   throw new 
KeeperException.BadArgumentsException("Wrong format of server string - each 
server should have 3 ports specified");
                                }
 
                                // check duplication of addresses and ports
@@ -527,7 +527,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
 
                                nextServers.remove(qs.id);
                                nextServers.put(qs.id, qs);
-                           }  
+                           }
                        }
                    } catch (ConfigException e){
                        throw new 
KeeperException.BadArgumentsException("Reconfiguration failed");
@@ -543,21 +543,21 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                    String msg = "Reconfig failed - new configuration must 
include at least 1 follower";
                    LOG.warn(msg);
                    throw new KeeperException.BadArgumentsException(msg);
-                }                           
-                   
+                }
+
                 if (!lzks.getLeader().isQuorumSynced(request.qv)) {
                    String msg2 = "Reconfig failed - there must be a connected 
and synced quorum in new configuration";
-                   LOG.warn(msg2);             
+                   LOG.warn(msg2);
                    throw new KeeperException.NewConfigNoQuorum();
                 }
-                
-                nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);            
   
+
+                nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);
                 checkACL(zks, request.cnxn, nodeRecord.acl, 
ZooDefs.Perms.WRITE, request.authInfo, null, null);
-                request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, 
request.qv.toString().getBytes(), -1));    
+                request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, 
request.qv.toString().getBytes(), -1));
                 nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
-                nodeRecord.stat.setVersion(-1);                
+                nodeRecord.stat.setVersion(-1);
                 addChangeRecord(nodeRecord);
-                break;                         
+                break;
             case OpCode.setACL:
                 zks.sessionTracker.checkSession(request.sessionId, 
request.getOwner());
                 SetACLRequest setAclRequest = (SetACLRequest)record;
@@ -579,13 +579,8 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 int to = request.request.getInt();
                 request.setTxn(new CreateSessionTxn(to));
                 request.request.rewind();
-                if (request.isLocalSession()) {
-                    // This will add to local session tracker if it is enabled
-                    zks.sessionTracker.addSession(request.sessionId, to);
-                } else {
-                    // Explicitly add to global session if the flag is not set
-                    zks.sessionTracker.addGlobalSession(request.sessionId, to);
-                }
+                // only add the global session tracker but not to ZKDb
+                zks.sessionTracker.trackSession(request.sessionId, to);
                 zks.setOwner(request.sessionId, request.getOwner());
                 break;
             case OpCode.closeSession:
@@ -759,7 +754,7 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 pRequest2Txn(request.type, zks.getNextZxid(), request, 
deleteRequest, true);
                 break;
             case OpCode.setData:
-                SetDataRequest setDataRequest = new SetDataRequest();          
      
+                SetDataRequest setDataRequest = new SetDataRequest();
                 pRequest2Txn(request.type, zks.getNextZxid(), request, 
setDataRequest, true);
                 break;
             case OpCode.reconfig:
@@ -768,11 +763,11 @@ public class PrepRequestProcessor extends 
ZooKeeperCriticalThread implements
                 pRequest2Txn(request.type, zks.getNextZxid(), request, 
reconfigRequest, true);
                 break;
             case OpCode.setACL:
-                SetACLRequest setAclRequest = new SetACLRequest();             
   
+                SetACLRequest setAclRequest = new SetACLRequest();
                 pRequest2Txn(request.type, zks.getNextZxid(), request, 
setAclRequest, true);
                 break;
             case OpCode.check:
-                CheckVersionRequest checkRequest = new CheckVersionRequest();  
            
+                CheckVersionRequest checkRequest = new CheckVersionRequest();
                 pRequest2Txn(request.type, zks.getNextZxid(), request, 
checkRequest, true);
                 break;
             case OpCode.multi:

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java 
b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
index 35d6c55..cd43ee2 100644
--- a/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
+++ b/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
@@ -92,7 +92,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
     }
 
     /* notify the client the session is closing and close/cleanup socket */
-    abstract void sendCloseSession();
+    public abstract void sendCloseSession();
 
     public abstract void process(WatchedEvent event);
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/SessionTracker.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/SessionTracker.java 
b/src/java/main/org/apache/zookeeper/server/SessionTracker.java
index bbf7df3..9ff7a7f 100644
--- a/src/java/main/org/apache/zookeeper/server/SessionTracker.java
+++ b/src/java/main/org/apache/zookeeper/server/SessionTracker.java
@@ -47,21 +47,20 @@ public interface SessionTracker {
     long createSession(int sessionTimeout);
 
     /**
-     * Add a global session to those being tracked.
+     * Track the session expire, not add to ZkDb.
      * @param id sessionId
      * @param to sessionTimeout
-     * @return whether the session was newly added (if false, already existed)
+     * @return whether the session was newly tracked (if false, already 
tracked)
      */
-    boolean addGlobalSession(long id, int to);
+    boolean trackSession(long id, int to);
 
     /**
-     * Add a session to those being tracked. The session is added as a local
-     * session if they are enabled, otherwise as global.
+     * Add the session to the local session map or global one in zkDB.
      * @param id sessionId
      * @param to sessionTimeout
      * @return whether the session was newly added (if false, already existed)
      */
-    boolean addSession(long id, int to);
+    boolean commitSession(long id, int to);
 
     /**
      * @param sessionId

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java 
b/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
index 6fc6eb5..e040493 100644
--- a/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
+++ b/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
@@ -102,7 +102,7 @@ public class SessionTrackerImpl extends 
ZooKeeperCriticalThread implements
         this.sessionsWithTimeout = sessionsWithTimeout;
         this.nextSessionId.set(initializeNextSession(serverId));
         for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
-            addSession(e.getKey(), e.getValue());
+            trackSession(e.getKey(), e.getValue());
         }
 
         EphemeralType.validateServerId(serverId);
@@ -245,17 +245,12 @@ public class SessionTrackerImpl extends 
ZooKeeperCriticalThread implements
 
     public long createSession(int sessionTimeout) {
         long sessionId = nextSessionId.getAndIncrement();
-        addSession(sessionId, sessionTimeout);
+        trackSession(sessionId, sessionTimeout);
         return sessionId;
     }
 
-    public boolean addGlobalSession(long id, int sessionTimeout) {
-        return addSession(id, sessionTimeout);
-    }
-
-    public synchronized boolean addSession(long id, int sessionTimeout) {
-        sessionsWithTimeout.put(id, sessionTimeout);
-
+    @Override
+    public synchronized boolean trackSession(long id, int sessionTimeout) {
         boolean added = false;
 
         SessionImpl session = sessionsById.get(id);
@@ -285,6 +280,10 @@ public class SessionTrackerImpl extends 
ZooKeeperCriticalThread implements
         return added;
     }
 
+    public synchronized boolean commitSession(long id, int sessionTimeout) {
+        return sessionsWithTimeout.put(id, sessionTimeout) == null;
+    }
+
     public boolean isTrackingSession(long sessionId) {
         return sessionsById.containsKey(sessionId);
     }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java 
b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
index 3d98f8e..64f242f 100644
--- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1217,13 +1217,8 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
         if (opCode == OpCode.createSession) {
             if (hdr != null && txn instanceof CreateSessionTxn) {
                 CreateSessionTxn cst = (CreateSessionTxn) txn;
-                sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
-            } else if (request != null && request.isLocalSession()) {
-                request.request.rewind();
-                int timeout = request.request.getInt();
-                request.request.rewind();
-                sessionTracker.addSession(request.sessionId, timeout);
-            } else {
+                sessionTracker.commitSession(sessionId, cst.getTimeOut());
+            } else if (request == null || !request.isLocalSession()) {
                 LOG.warn("*****>>>>> Got "
                         + txn.getClass() + " "
                         + txn.toString());

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java 
b/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
index 38bbfe8..e79207b 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
@@ -85,31 +85,47 @@ public class LeaderSessionTracker extends 
UpgradeableSessionTracker {
         return globalSessionTracker.isTrackingSession(sessionId);
     }
 
-    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
-        boolean added =
-            globalSessionTracker.addSession(sessionId, sessionTimeout);
-        if (localSessionsEnabled && added) {
+    public boolean trackSession(long sessionId, int sessionTimeout) {
+        boolean tracked =
+            globalSessionTracker.trackSession(sessionId, sessionTimeout);
+        if (localSessionsEnabled && tracked) {
             // Only do extra logging so we know what kind of session this is
             // if we're supporting both kinds of sessions
-            LOG.info("Adding global session 0x" + Long.toHexString(sessionId));
+            LOG.info("Tracking global session 0x" + 
Long.toHexString(sessionId));
         }
-        return added;
+        return tracked;
     }
 
-    public boolean addSession(long sessionId, int sessionTimeout) {
-        boolean added;
-        if (localSessionsEnabled && !isGlobalSession(sessionId)) {
-            added = localSessionTracker.addSession(sessionId, sessionTimeout);
-            // Check for race condition with session upgrading
-            if (isGlobalSession(sessionId)) {
-                added = false;
-                localSessionTracker.removeSession(sessionId);
-            } else if (added) {
-              LOG.info("Adding local session 0x" + 
Long.toHexString(sessionId));
-            }
-        } else {
-            added = addGlobalSession(sessionId, sessionTimeout);
+    /**
+     * Synchronized on this to avoid race condition of adding a local session
+     * after committed global session, which may cause the same session being
+     * tracked on this server and leader.
+     */
+    public synchronized boolean commitSession(
+            long sessionId, int sessionTimeout) {
+        boolean added =
+            globalSessionTracker.commitSession(sessionId, sessionTimeout);
+
+        if (added) {
+            LOG.info("Committing global session 0x" + 
Long.toHexString(sessionId));
+        }
+
+        // If the session moved before the session upgrade finished, it's
+        // possible that the session will be added to the local session
+        // again. Need to double check and remove it from local session
+        // tracker when the global session is quorum committed, otherwise the
+        // local session might be tracked both locally and on leader.
+        //
+        // This cannot totally avoid the local session being upgraded again
+        // because there is still race condition between create another upgrade
+        // request and process the createSession commit, and there is no way
+        // to know there is a on flying createSession request because it might
+        // be upgraded by other server which owns the session before move.
+        if (localSessionsEnabled) {
+            removeLocalSession(sessionId);
+            finishedUpgrading(sessionId);
         }
+
         return added;
     }
 

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java 
b/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
index 1cc2ab1..5420494 100644
--- 
a/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
+++ 
b/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
@@ -58,7 +58,6 @@ public class LearnerSessionTracker extends 
UpgradeableSessionTracker {
     private final long serverId;
     private final AtomicLong nextSessionId = new AtomicLong();
 
-    private final boolean localSessionsEnabled;
     private final ConcurrentMap<Long, Integer> globalSessionsWithTimeouts;
 
     public LearnerSessionTracker(SessionExpirer expirer,
@@ -101,33 +100,44 @@ public class LearnerSessionTracker extends 
UpgradeableSessionTracker {
         return globalSessionsWithTimeouts.containsKey(sessionId);
     }
 
-    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+    public boolean trackSession(long sessionId, int sessionTimeout) {
+        // Learner doesn't track global session, do nothing here
+        return false;
+    }
+
+    /**
+     * Synchronized on this to avoid race condition of adding a local session
+     * after committed global session, which may cause the same session being
+     * tracked on this server and leader.
+     */
+    public synchronized boolean commitSession(
+            long sessionId, int sessionTimeout) {
         boolean added =
             globalSessionsWithTimeouts.put(sessionId, sessionTimeout) == null;
-        if (localSessionsEnabled && added) {
+
+        if (added) {
             // Only do extra logging so we know what kind of session this is
             // if we're supporting both kinds of sessions
-            LOG.info("Adding global session 0x" + Long.toHexString(sessionId));
+            LOG.info("Committing global session 0x" + 
Long.toHexString(sessionId));
         }
-        touchTable.get().put(sessionId, sessionTimeout);
-        return added;
-    }
 
-    public boolean addSession(long sessionId, int sessionTimeout) {
-        boolean added;
-        if (localSessionsEnabled && !isGlobalSession(sessionId)) {
-            added = localSessionTracker.addSession(sessionId, sessionTimeout);
-            // Check for race condition with session upgrading
-            if (isGlobalSession(sessionId)) {
-                added = false;
-                localSessionTracker.removeSession(sessionId);
-            } else if (added) {
-                LOG.info("Adding local session 0x"
-                         + Long.toHexString(sessionId));
-            }
-        } else {
-            added = addGlobalSession(sessionId, sessionTimeout);
+        // If the session moved before the session upgrade finished, it's
+        // possible that the session will be added to the local session
+        // again. Need to double check and remove it from local session
+        // tracker when the global session is quorum committed, otherwise the
+        // local session might be tracked both locally and on leader.
+        //
+        // This cannot totally avoid the local session being upgraded again
+        // because there is still race condition between create another upgrade
+        // request and process the createSession commit, and there is no way
+        // to know there is a on flying createSession request because it might
+        // be upgraded by other server which owns the session before move.
+        if (localSessionsEnabled) {
+            removeLocalSession(sessionId);
+            finishedUpgrading(sessionId);
         }
+
+        touchTable.get().put(sessionId, sessionTimeout);
         return added;
     }
 
@@ -136,7 +146,7 @@ public class LearnerSessionTracker extends 
UpgradeableSessionTracker {
             if (localSessionTracker.touchSession(sessionId, sessionTimeout)) {
                 return true;
             }
-            if (!isGlobalSession(sessionId)) {
+            if (!isGlobalSession(sessionId) && !isUpgradingSession(sessionId)) 
{
                 return false;
             }
         }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java 
b/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
index df6ccb2..523c440 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
@@ -40,7 +40,9 @@ public class LocalSessionTracker extends SessionTrackerImpl {
         return false;
     }
 
-    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
-        throw new UnsupportedOperationException();
+    public long createSession(int sessionTimeout) {
+        long sessionId = super.createSession(sessionTimeout);
+        commitSession(sessionId, sessionTimeout);
+        return sessionId;
     }
 }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
----------------------------------------------------------------------
diff --git 
a/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
 
b/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
index 2e58ff5..eb50a07 100644
--- 
a/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
+++ 
b/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
@@ -33,7 +33,9 @@ public abstract class UpgradeableSessionTracker implements 
SessionTracker {
     private static final Logger LOG = 
LoggerFactory.getLogger(UpgradeableSessionTracker.class);
 
     private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
+    private ConcurrentMap<Long, Integer> upgradingSessions;
     protected LocalSessionTracker localSessionTracker;
+    protected boolean localSessionsEnabled;
 
     public void start() {}
 
@@ -43,6 +45,7 @@ public abstract class UpgradeableSessionTracker implements 
SessionTracker {
             new ConcurrentHashMap<Long, Integer>();
         this.localSessionTracker = new LocalSessionTracker(
             expirer, this.localSessionsWithTimeouts, tickTime, id, listener);
+        this.upgradingSessions = new ConcurrentHashMap<Long, Integer>();
     }
 
     public boolean isTrackingSession(long sessionId) {
@@ -54,6 +57,17 @@ public abstract class UpgradeableSessionTracker implements 
SessionTracker {
             localSessionTracker.isTrackingSession(sessionId);
     }
 
+    public boolean isUpgradingSession(long sessionId) {
+        return upgradingSessions != null &&
+            upgradingSessions.containsKey(sessionId);
+    }
+
+    public void finishedUpgrading(long sessionId) {
+        if (upgradingSessions != null) {
+            upgradingSessions.remove(sessionId);
+        }
+    }
+
     abstract public boolean isGlobalSession(long sessionId);
 
     /**
@@ -74,14 +88,27 @@ public abstract class UpgradeableSessionTracker implements 
SessionTracker {
         Integer timeout = localSessionsWithTimeouts.remove(sessionId);
         if (timeout != null) {
             LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
-            // Add as global before removing as local
-            addGlobalSession(sessionId, timeout);
+            // Track global session, which will add to global session tracker
+            // on leader and do nothing on learner. Need to start track global
+            // session in leader now to update the session expire between
+            // LeaderRequestProcessor and PrepRequestProcessor.
+            trackSession(sessionId, timeout);
+            // Track ongoing upgrading sessions, learner will use it to find
+            // other sessions it has which are not in local and global sessions
+            upgradingSessions.put(sessionId, timeout);
             localSessionTracker.removeSession(sessionId);
             return timeout;
         }
         return -1;
     }
 
+    protected void removeLocalSession(long sessionId) {
+         if (localSessionTracker == null) {
+            return;
+        }
+        localSessionTracker.removeSession(sessionId);
+    }
+
     public void checkGlobalSession(long sessionId, Object owner)
             throws KeeperException.SessionExpiredException,
             KeeperException.SessionMovedException {

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java 
b/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java
index 2e37272..7ae0004 100644
--- a/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java
+++ b/src/java/test/org/apache/zookeeper/server/MockServerCnxn.java
@@ -44,7 +44,7 @@ public class MockServerCnxn extends ServerCnxn {
     }
 
     @Override
-    void sendCloseSession() {
+    public void sendCloseSession() {
     }
 
     @Override
@@ -110,4 +110,4 @@ public class MockServerCnxn extends ServerCnxn {
     public int getInterestOps() {
         return 0;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java 
b/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
index 8223583..606994c 100644
--- a/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
+++ b/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
@@ -208,18 +208,18 @@ public class PrepRequestProcessorTest extends ClientBase {
         @Override
         public void shutdown() {
             // TODO Auto-generated method stub
-            
+
         }
     }
-    
+
     private class MySessionTracker implements SessionTracker {
         @Override
-        public boolean addGlobalSession(long id, int to) {
+        public boolean trackSession(long id, int to) {
             // TODO Auto-generated method stub
             return false;
         }
         @Override
-        public boolean addSession(long id, int to) {
+        public boolean commitSession(long id, int to) {
             // TODO Auto-generated method stub
             return false;
         }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java 
b/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java
index 00e34fa..abe9aa0 100644
--- a/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java
+++ b/src/java/test/org/apache/zookeeper/server/SessionTrackerTest.java
@@ -51,7 +51,7 @@ public class SessionTrackerTest extends ZKTestCase {
         ZooKeeperServer zks = setupSessionTracker();
 
         latch = new CountDownLatch(1);
-        zks.sessionTracker.addSession(sessionId, sessionTimeout);
+        zks.sessionTracker.trackSession(sessionId, sessionTimeout);
         SessionTrackerImpl sessionTrackerImpl = (SessionTrackerImpl) 
zks.sessionTracker;
         SessionImpl sessionImpl = sessionTrackerImpl.sessionsById
                 .get(sessionId);
@@ -68,7 +68,7 @@ public class SessionTrackerTest extends ZKTestCase {
         // Simulating FinalRequestProcessor logic: create session request has
         // delayed and now reaches FinalRequestProcessor. Here the leader zk
         // will do sessionTracker.addSession(id, timeout)
-        sessionTrackerImpl.addSession(sessionId, sessionTimeout);
+        sessionTrackerImpl.trackSession(sessionId, sessionTimeout);
         try {
             sessionTrackerImpl.checkSession(sessionId, sessionOwner);
             Assert.fail("Should throw session expiry exception "
@@ -93,7 +93,7 @@ public class SessionTrackerTest extends ZKTestCase {
         ZooKeeperServer zks = setupSessionTracker();
 
         latch = new CountDownLatch(1);
-        zks.sessionTracker.addSession(sessionId, sessionTimeout);
+        zks.sessionTracker.trackSession(sessionId, sessionTimeout);
         SessionTrackerImpl sessionTrackerImpl = (SessionTrackerImpl) 
zks.sessionTracker;
         SessionImpl sessionImpl = sessionTrackerImpl.sessionsById
                 .get(sessionId);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
 
b/src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
new file mode 100644
index 0000000..8e345fe
--- /dev/null
+++ 
b/src/java/test/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.security.sasl.SaslException;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.Request;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.zookeeper.test.QuorumBase;
+import org.apache.zookeeper.test.DisconnectableZooKeeper;
+
+public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(SessionUpgradeQuorumTest.class);
+    public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+    public static final int SERVER_COUNT = 3;
+    private MainThread mt[];
+    private int clientPorts[];
+    private TestQPMainDropSessionUpgrading qpMain[];
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        // setup the env with RetainDB and local session upgrading
+        ClientBase.setupTestEnv();
+
+        mt = new MainThread[SERVER_COUNT];
+        clientPorts = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            sb.append("server.").append(i).append("=127.0.0.1:")
+              .append(PortAssignment.unique()).append(":")
+              .append(PortAssignment.unique()).append("\n");
+        }
+        sb.append("localSessionsEnabled=true\n");
+        sb.append("localSessionsUpgradingEnabled=true\n");
+        String cfg = sb.toString();
+
+        // create a 3 server ensemble
+        qpMain = new TestQPMainDropSessionUpgrading[SERVER_COUNT];
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            final TestQPMainDropSessionUpgrading qp = new 
TestQPMainDropSessionUpgrading();
+            qpMain[i] = qp;
+            mt[i] = new MainThread(i, clientPorts[i], cfg, false) {
+                @Override
+                public TestQPMain getTestQPMain() {
+                    return qp;
+                }
+            };
+            mt[i].start();
+        }
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            CONNECTION_TIMEOUT));
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i].shutdown();
+        }
+    }
+
+    @Test
+    public void testLocalSessionUpgradeSnapshot() throws IOException, 
InterruptedException {
+        // select the candidate of follower
+        int leader = -1;
+        int followerA = -1;
+        for (int i = SERVER_COUNT - 1; i >= 0; i--) {
+            if (mt[i].main.quorumPeer.leader != null) {
+                leader = i;
+            } else if (followerA == -1) {
+                followerA = i;
+            }
+        }
+
+        LOG.info("follower A is {}", followerA);
+        qpMain[followerA].setDropCreateSession(true);
+
+        // create a client, and create an ephemeral node to trigger the
+        // upgrading process
+        final String node = "/node-1";
+        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[followerA],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+
+        waitForOne(zk, States.CONNECTED);
+
+        // clone the session id and passwd for later usage
+        long sessionId = zk.getSessionId();
+
+        // should fail because of the injection
+        try {
+            zk.create(node, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+            Assert.fail("expect to failed to upgrade session due to the " +
+                    "TestQPMainDropSessionUpgrading is being used");
+        } catch (KeeperException e) {
+            LOG.info("KeeperException when create ephemeral node, {}", e);
+        }
+
+        // force to take snapshot
+        qpMain[followerA].quorumPeer.follower.zk.takeSnapshot(true);
+
+        // wait snapshot finish
+        Thread.sleep(500);
+
+        // shutdown all servers
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i].shutdown();
+        }
+
+        ArrayList<States> waitStates =new ArrayList<States>();
+        waitStates.add(States.CONNECTING);
+        waitStates.add(States.CLOSED);
+        waitForOne(zk, waitStates);
+
+        // start the servers again, start follower A last as we want to
+        // keep it running as follower
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i].start();
+        }
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+                            CONNECTION_TIMEOUT));
+        }
+
+        // check global session not exist on follower A
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            ConcurrentHashMap<Long, Integer> sessions =
+                    mt[i].main.quorumPeer.getZkDb().getSessionWithTimeOuts();
+            Assert.assertFalse("server " + i + " should not have global " +
+                    "session " + sessionId, sessions.containsKey(sessionId));
+        }
+
+        zk.close();
+    }
+
+    @Test
+    public void testOnlyUpgradeSessionOnce()
+            throws IOException, InterruptedException, KeeperException {
+        // create a client, and create an ephemeral node to trigger the
+        // upgrading process
+        final String node = "/node-1";
+        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[0],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+
+        waitForOne(zk, States.CONNECTED);
+        long sessionId = zk.getSessionId();
+
+        QuorumZooKeeperServer server =
+                (QuorumZooKeeperServer) 
mt[0].main.quorumPeer.getActiveServer();
+        Request create1 = createEphemeralRequest("/data-1", sessionId);
+        Request create2 = createEphemeralRequest("/data-2", sessionId);
+
+        Assert.assertNotNull("failed to upgrade on a ephemeral create",
+                server.checkUpgradeSession(create1));
+        Assert.assertNull("tried to upgrade again", 
server.checkUpgradeSession(create2));
+
+        // clean al the setups and close the zk
+        zk.close();
+    }
+
+    private static class TestQPMainDropSessionUpgrading extends TestQPMain {
+
+        private volatile boolean shouldDrop = false;
+
+        public void setDropCreateSession(boolean dropCreateSession) {
+            shouldDrop = dropCreateSession;
+        }
+
+        @Override
+        protected QuorumPeer getQuorumPeer() throws SaslException {
+            return new QuorumPeer() {
+
+                @Override
+                protected Follower makeFollower(FileTxnSnapLog logFactory)
+                        throws IOException {
+
+                    return new Follower(this, new FollowerZooKeeperServer(
+                            logFactory, this, this.getZkDb())) {
+
+                        @Override
+                        protected void request(Request request)
+                                throws IOException {
+                            if (!shouldDrop) {
+                                super.request(request);
+                                return;
+                            }
+                            LOG.info("request is {}, cnxn {}", request.type, 
request.cnxn);
+
+                            if (request.type == ZooDefs.OpCode.createSession) {
+                                LOG.info("drop createSession request {}", 
request);
+                                return;
+                            }
+
+                            if (request.type == ZooDefs.OpCode.create &&
+                                    request.cnxn != null) {
+                                CreateRequest createRequest = new 
CreateRequest();
+                                request.request.rewind();
+                                ByteBufferInputStream.byteBuffer2Record(
+                                        request.request, createRequest);
+                                request.request.rewind();
+                                try {
+                                    CreateMode createMode =
+                                          
CreateMode.fromFlag(createRequest.getFlags());
+                                    if (createMode.isEphemeral()) {
+                                        request.cnxn.sendCloseSession();
+                                    }
+                                } catch (KeeperException e) {}
+                                return;
+                            }
+
+                            super.request(request);
+                        }
+                    };
+                }
+            };
+        }
+    }
+
+    private void waitForOne(ZooKeeper zk, States state) throws 
InterruptedException {
+        ArrayList<States> states = new ArrayList<States>();
+        states.add(state);
+        waitForOne(zk, states);
+    }
+
+    private void waitForOne(ZooKeeper zk, ArrayList<States> states) throws 
InterruptedException {
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
+        while (!states.contains(zk.getState())) {
+            if (iterations-- == 0) {
+                LOG.info("state is {}", zk.getState());
+                throw new RuntimeException("Waiting too long");
+            }
+            Thread.sleep(500);
+        }
+    }
+
+    private Request createEphemeralRequest(String path, long sessionId) throws 
IOException {
+        ByteArrayOutputStream boas = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+        CreateRequest createRequest = new CreateRequest(path,
+                "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL.toFlag());
+        createRequest.serialize(boa, "request");
+        ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+        return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb,
+                new ArrayList<Id>());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/test/ClientBase.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/test/ClientBase.java 
b/src/java/test/org/apache/zookeeper/test/ClientBase.java
index 7d2ec56..a550098 100644
--- a/src/java/test/org/apache/zookeeper/test/ClientBase.java
+++ b/src/java/test/org/apache/zookeeper/test/ClientBase.java
@@ -432,9 +432,9 @@ public abstract class ClientBase extends ZKTestCase {
      * Because any exception on starting the server would leave the server
      * running and the caller would not be able to shutdown the instance. This
      * may affect other test cases.
-     * 
+     *
      * @return newly created server instance
-     * 
+     *
      * @see <a
      *      
href="https://issues.apache.org/jira/browse/ZOOKEEPER-1852";>ZOOKEEPER-1852</a>
      *      for more information.
@@ -508,7 +508,7 @@ public abstract class ClientBase extends ZKTestCase {
          */
         OSMXBean osMbean = new OSMXBean();
         if (osMbean.getUnix() == true) {
-            initialFdCount = osMbean.getOpenFileDescriptorCount();     
+            initialFdCount = osMbean.getOpenFileDescriptorCount();
             LOG.info("Initial fdcount is: "
                     + initialFdCount);
         }
@@ -568,7 +568,7 @@ public abstract class ClientBase extends ZKTestCase {
 
     /**
      * Returns a string representation of the given long value session id
-     * 
+     *
      * @param sessionId
      *            long value of session id
      * @return string representation of session id
@@ -630,7 +630,7 @@ public abstract class ClientBase extends ZKTestCase {
          */
         OSMXBean osMbean = new OSMXBean();
         if (osMbean.getUnix() == true) {
-            long fdCount = osMbean.getOpenFileDescriptorCount();     
+            long fdCount = osMbean.getOpenFileDescriptorCount();
             String message = "fdcount after test is: "
                     + fdCount + " at start it was " + initialFdCount;
             LOG.info(message);

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/test/QuorumBase.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/test/QuorumBase.java 
b/src/java/test/org/apache/zookeeper/test/QuorumBase.java
index fb2bb87..fcaa9b6 100644
--- a/src/java/test/org/apache/zookeeper/test/QuorumBase.java
+++ b/src/java/test/org/apache/zookeeper/test/QuorumBase.java
@@ -53,13 +53,13 @@ public class QuorumBase extends ClientBase {
     protected int port3;
     protected int port4;
     protected int port5;
-    
+
     protected int portLE1;
     protected int portLE2;
     protected int portLE3;
     protected int portLE4;
     protected int portLE5;
-    
+
     protected int portClient1;
     protected int portClient2;
     protected int portClient3;
@@ -73,12 +73,12 @@ public class QuorumBase extends ClientBase {
     // This just avoids complaints by junit
     public void testNull() {
     }
-    
+
     @Override
     public void setUp() throws Exception {
         setUp(false);
     }
-        
+
     protected void setUp(boolean withObservers) throws Exception {
         LOG.info("QuorumBase.setup " + getTestName());
         setupTestEnv();
@@ -92,19 +92,19 @@ public class QuorumBase extends ClientBase {
         port3 = PortAssignment.unique();
         port4 = PortAssignment.unique();
         port5 = PortAssignment.unique();
-        
+
         portLE1 = PortAssignment.unique();
         portLE2 = PortAssignment.unique();
         portLE3 = PortAssignment.unique();
         portLE4 = PortAssignment.unique();
         portLE5 = PortAssignment.unique();
-        
+
         portClient1 = PortAssignment.unique();
         portClient2 = PortAssignment.unique();
         portClient3 = PortAssignment.unique();
         portClient4 = PortAssignment.unique();
         portClient5 = PortAssignment.unique();
-        
+
         hostPort = "127.0.0.1:" + portClient1
             + ",127.0.0.1:" + portClient2
             + ",127.0.0.1:" + portClient3
@@ -128,44 +128,44 @@ public class QuorumBase extends ClientBase {
 
         LOG.info("Setup finished");
     }
-    
+
     void startServers() throws Exception {
-        startServers(false);        
+        startServers(false);
     }
-    
+
     void startServers(boolean withObservers) throws Exception {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
         Map<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
-        peers.put(Long.valueOf(1), new QuorumServer(1, 
+        peers.put(Long.valueOf(1), new QuorumServer(1,
                 new InetSocketAddress(LOCALADDR, port1),
                 new InetSocketAddress(LOCALADDR, portLE1),
                 new InetSocketAddress(LOCALADDR, portClient1),
                 LearnerType.PARTICIPANT));
-        peers.put(Long.valueOf(2), new QuorumServer(2, 
+        peers.put(Long.valueOf(2), new QuorumServer(2,
                 new InetSocketAddress(LOCALADDR, port2),
                 new InetSocketAddress(LOCALADDR, portLE2),
                 new InetSocketAddress(LOCALADDR, portClient2),
                 LearnerType.PARTICIPANT));
-        peers.put(Long.valueOf(3), new QuorumServer(3, 
+        peers.put(Long.valueOf(3), new QuorumServer(3,
                 new InetSocketAddress(LOCALADDR, port3),
                 new InetSocketAddress(LOCALADDR, portLE3),
                 new InetSocketAddress(LOCALADDR, portClient3),
                 LearnerType.PARTICIPANT));
-        peers.put(Long.valueOf(4), new QuorumServer(4, 
+        peers.put(Long.valueOf(4), new QuorumServer(4,
                 new InetSocketAddress(LOCALADDR, port4),
                 new InetSocketAddress(LOCALADDR, portLE4),
                 new InetSocketAddress(LOCALADDR, portClient4),
                 LearnerType.PARTICIPANT));
-        peers.put(Long.valueOf(5), new QuorumServer(5, 
+        peers.put(Long.valueOf(5), new QuorumServer(5,
                 new InetSocketAddress(LOCALADDR, port5),
                 new InetSocketAddress(LOCALADDR, portLE5),
                 new InetSocketAddress(LOCALADDR, portClient5),
                 LearnerType.PARTICIPANT));
-        
+
         if (withObservers) {
-            peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER;        
+            peers.get(Long.valueOf(4)).type = LearnerType.OBSERVER;
             peers.get(Long.valueOf(5)).type = LearnerType.OBSERVER;
         }
 
@@ -184,18 +184,18 @@ public class QuorumBase extends ClientBase {
         LOG.info("creating QuorumPeer 5 port " + portClient5);
         s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, 
initLimit, syncLimit);
         Assert.assertEquals(portClient5, s5.getClientPort());
-        
+
         if (withObservers) {
             s4.setLearnerType(LearnerType.OBSERVER);
             s5.setLearnerType(LearnerType.OBSERVER);
         }
-        
+
         LOG.info("QuorumPeer 1 voting view: " + s1.getVotingView());
         LOG.info("QuorumPeer 2 voting view: " + s2.getVotingView());
         LOG.info("QuorumPeer 3 voting view: " + s3.getVotingView());
         LOG.info("QuorumPeer 4 voting view: " + s4.getVotingView());
-        LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView());       
-        
+        LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView());
+
         s1.enableLocalSessions(localSessionsEnabled);
         s2.enableLocalSessions(localSessionsEnabled);
         s3.enableLocalSessions(localSessionsEnabled);
@@ -299,37 +299,37 @@ public class QuorumBase extends ClientBase {
         int tickTime = 2000;
         int initLimit = 3;
         int syncLimit = 3;
-        
+
         if(peers == null){
             peers = new HashMap<Long,QuorumServer>();
 
-            peers.put(Long.valueOf(1), new QuorumServer(1, 
+            peers.put(Long.valueOf(1), new QuorumServer(1,
                 new InetSocketAddress(LOCALADDR, port1),
                 new InetSocketAddress(LOCALADDR, portLE1),
                 new InetSocketAddress(LOCALADDR, portClient1),
                 LearnerType.PARTICIPANT));
-            peers.put(Long.valueOf(2), new QuorumServer(2, 
+            peers.put(Long.valueOf(2), new QuorumServer(2,
                 new InetSocketAddress(LOCALADDR, port2),
                 new InetSocketAddress(LOCALADDR, portLE2),
                 new InetSocketAddress(LOCALADDR, portClient2),
                 LearnerType.PARTICIPANT));
-            peers.put(Long.valueOf(3), new QuorumServer(3, 
+            peers.put(Long.valueOf(3), new QuorumServer(3,
                 new InetSocketAddress(LOCALADDR, port3),
                 new InetSocketAddress(LOCALADDR, portLE3),
                 new InetSocketAddress(LOCALADDR, portClient3),
                 LearnerType.PARTICIPANT));
-            peers.put(Long.valueOf(4), new QuorumServer(4, 
+            peers.put(Long.valueOf(4), new QuorumServer(4,
                 new InetSocketAddress(LOCALADDR, port4),
                 new InetSocketAddress(LOCALADDR, portLE4),
                 new InetSocketAddress(LOCALADDR, portClient4),
                 LearnerType.PARTICIPANT));
-            peers.put(Long.valueOf(5), new QuorumServer(5, 
+            peers.put(Long.valueOf(5), new QuorumServer(5,
                 new InetSocketAddress(LOCALADDR, port5),
                 new InetSocketAddress(LOCALADDR, portLE5),
                 new InetSocketAddress(LOCALADDR, portClient5),
                 LearnerType.PARTICIPANT));
         }
-        
+
         switch(i){
         case 1:
             LOG.info("creating QuorumPeer 1 port " + portClient1);
@@ -341,7 +341,7 @@ public class QuorumBase extends ClientBase {
             s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, 
tickTime, initLimit, syncLimit);
             Assert.assertEquals(portClient2, s2.getClientPort());
             break;
-        case 3:  
+        case 3:
             LOG.info("creating QuorumPeer 3 port " + portClient3);
             s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, 
tickTime, initLimit, syncLimit);
             Assert.assertEquals(portClient3, s3.getClientPort());
@@ -361,7 +361,7 @@ public class QuorumBase extends ClientBase {
     @Override
     public void tearDown() throws Exception {
         LOG.info("TearDown started");
-        
+
         OSMXBean osMbean = new OSMXBean();
         if (osMbean.getUnix() == true) {
             LOG.info("fdcount after test is: "

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/cd209456/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
----------------------------------------------------------------------
diff --git 
a/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java 
b/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
index b484452..d8c9bb9 100644
--- a/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
+++ b/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
@@ -98,8 +98,7 @@ public class SessionTrackerCheckTest extends ZKTestCase {
         }
 
         // Local session
-        sessionId = 0xf005ba11;
-        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        sessionId = tracker.createSession(CONNECTION_TIMEOUT);
         try {
             tracker.checkSession(sessionId, null);
         } catch (Exception e) {
@@ -144,8 +143,8 @@ public class SessionTrackerCheckTest extends ZKTestCase {
             Assert.fail("local session from other server should not fail");
         }
 
-        // Global session
-        tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT);
+        // Track global session
+        tracker.trackSession(sessionId, CONNECTION_TIMEOUT);
         try {
             tracker.checkSession(sessionId, null);
         } catch (Exception e) {
@@ -158,9 +157,7 @@ public class SessionTrackerCheckTest extends ZKTestCase {
         }
 
         // Local session from the leader
-        sessionId = (expirer.sid << 56) + 1;
-        ;
-        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        sessionId = tracker.createSession(CONNECTION_TIMEOUT);
         try {
             tracker.checkSession(sessionId, null);
         } catch (Exception e) {
@@ -168,7 +165,7 @@ public class SessionTrackerCheckTest extends ZKTestCase {
         }
 
         // During session upgrade
-        tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT);
+        tracker.trackSession(sessionId, CONNECTION_TIMEOUT);
         try {
             tracker.checkSession(sessionId, null);
         } catch (Exception e) {
@@ -186,7 +183,7 @@ public class SessionTrackerCheckTest extends ZKTestCase {
 
         // Global session
         sessionId = 0xdeadbeef;
-        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        tracker.trackSession(sessionId, CONNECTION_TIMEOUT);
         try {
             tracker.checkSession(sessionId, null);
         } catch (Exception e) {

Reply via email to