This is an automated email from the ASF dual-hosted git repository.
ddiederen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e41dc7d ZOOKEEPER-3970: Enable ZooKeeperServerController to expire
session.
e41dc7d is described below
commit e41dc7dbb0541745afdc768bf757e29c938a9ab8
Author: Michael Han <[email protected]>
AuthorDate: Thu May 20 09:27:40 2021 +0000
ZOOKEEPER-3970: Enable ZooKeeperServerController to expire session.
This is a follow up of ZOOKEEPER-3948. Here we enable
ZooKeeperServerController to be able to expire a global or local session. This
is very useful in our experience in integration testing when we want a
controlled session expiration mechanism. This is done by having session tracker
exposing both global and local session stats, so a zookeeper server can expire
the sessions in the controller.
Author: Michael Han <[email protected]>
Reviewers: Damien Diederen <[email protected]>
Closes #1505 from hanm/ZOOKEEPER-3970
---
.../apache/zookeeper/server/SessionTracker.java | 10 +++++++++
.../zookeeper/server/SessionTrackerImpl.java | 11 +++++++++-
.../apache/zookeeper/server/ZooKeeperServer.java | 6 ++++++
.../controller/ZooKeeperServerController.java | 24 +++++++++++++++++++---
.../server/quorum/LeaderSessionTracker.java | 3 +++
.../server/quorum/LearnerSessionTracker.java | 3 +++
.../server/quorum/LocalSessionTracker.java | 5 ++++-
.../server/quorum/UpgradeableSessionTracker.java | 9 +++++++-
.../zookeeper/server/PrepRequestProcessorTest.java | 10 ++++++++-
.../controller/ControllerClientServerTest.java | 6 +-----
.../ZooKeeperServerControllerEndToEndTest.java | 4 ++--
11 files changed, 77 insertions(+), 14 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
index 9cf4774..0d7ce81 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java
@@ -138,4 +138,14 @@ public interface SessionTracker {
long getLocalSessionCount();
boolean isLocalSessionsEnabled();
+
+ /**
+ * Get a set of global session IDs
+ */
+ Set<Long> globalSessions();
+
+ /**
+ * Get a set of local session IDs
+ */
+ Set<Long> localSessions();
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
index cedd0c0..e871198 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.MessageFormat;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
@@ -49,7 +50,7 @@ public class SessionTrackerImpl extends
ZooKeeperCriticalThread implements Sessi
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
- private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
+ protected final ConcurrentMap<Long, Integer> sessionsWithTimeout;
private final AtomicLong nextSessionId = new AtomicLong();
public static class SessionImpl implements Session {
@@ -347,4 +348,12 @@ public class SessionTrackerImpl extends
ZooKeeperCriticalThread implements Sessi
public boolean isLocalSessionsEnabled() {
return false;
}
+
+ public Set<Long> globalSessions() {
+ return sessionsById.keySet();
+ }
+
+ public Set<Long> localSessions() {
+ return Collections.emptySet();
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 72e5577..eed74a7 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -632,6 +632,12 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
close(sessionId);
}
+ public void expire(long sessionId) {
+ LOG.info("forcibly expiring session 0x{}",
Long.toHexString(sessionId));
+
+ close(sessionId);
+ }
+
public static class MissingSessionException extends IOException {
private static final long serialVersionUID = 7467414635467261007L;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ZooKeeperServerController.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ZooKeeperServerController.java
index 615db87..3f85ef8 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ZooKeeperServerController.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ZooKeeperServerController.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.zookeeper.server.ExitCode;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.util.ServiceUtils;
@@ -122,13 +123,12 @@ public class ZooKeeperServerController {
}
break;
case EXPIRESESSION:
- // TODO: (hanm) implement once dependent feature is ready.
if (command.getParameter() == null) {
- // expireAllSessions();
+ expireAllSessions();
} else {
// A single parameter should be a session id as long.
// Parse failure exceptions will be sent to the caller
- // expireSession(Long.decode(command.getParameter()));
+ expireSession(Long.decode(command.getParameter()));
}
break;
case REJECTCONNECTIONS:
@@ -164,5 +164,23 @@ public class ZooKeeperServerController {
}
}
+ private ZooKeeperServer getServer() {
+ return quorumPeer.getActiveServer();
+ }
+
+ private void expireSession(long sessionId) {
+ getServer().expire(sessionId);
+ }
+
+ private void expireAllSessions() {
+ for (Long sessionId : getServer().getSessionTracker().localSessions())
{
+ expireSession(sessionId);
+ }
+
+ for (Long sessionId :
getServer().getSessionTracker().globalSessions()) {
+ expireSession(sessionId);
+ }
+ }
+
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
index e750b4d..3f9789f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
@@ -223,4 +223,7 @@ public class LeaderSessionTracker extends
UpgradeableSessionTracker {
return sessionExpiryMap;
}
+ public Set<Long> globalSessions() {
+ return globalSessionTracker.globalSessions();
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
index b12a62c..78c927a 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
@@ -225,4 +225,7 @@ public class LearnerSessionTracker extends
UpgradeableSessionTracker {
return new HashMap<Long, Set<Long>>();
}
+ public Set<Long> globalSessions() {
+ return globalSessionsWithTimeouts.keySet();
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
index 009fe03..b323122 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
@@ -18,6 +18,7 @@
package org.apache.zookeeper.server.quorum;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.zookeeper.server.SessionTrackerImpl;
import org.apache.zookeeper.server.ZooKeeperServerListener;
@@ -26,7 +27,6 @@ import org.apache.zookeeper.server.ZooKeeperServerListener;
* Local session tracker.
*/
public class LocalSessionTracker extends SessionTrackerImpl {
-
public LocalSessionTracker(SessionExpirer expirer, ConcurrentMap<Long,
Integer> sessionsWithTimeouts, int tickTime, long id, ZooKeeperServerListener
listener) {
super(expirer, sessionsWithTimeouts, tickTime, id, listener);
}
@@ -45,4 +45,7 @@ public class LocalSessionTracker extends SessionTrackerImpl {
return sessionId;
}
+ public Set<Long> localSessions() {
+ return sessionsWithTimeout.keySet();
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
index 69dfd47..fb5dbcc 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
@@ -18,6 +18,8 @@
package org.apache.zookeeper.server.quorum;
+import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.zookeeper.KeeperException;
@@ -111,7 +113,8 @@ public abstract class UpgradeableSessionTracker implements
SessionTracker {
localSessionTracker.removeSession(sessionId);
}
- public void checkGlobalSession(long sessionId, Object owner) throws
KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
+ public void checkGlobalSession(long sessionId, Object owner)
+ throws KeeperException.SessionExpiredException,
KeeperException.SessionMovedException {
throw new UnsupportedOperationException();
}
@@ -122,4 +125,8 @@ public abstract class UpgradeableSessionTracker implements
SessionTracker {
return localSessionsWithTimeouts.size();
}
+ public Set<Long> localSessions() {
+ return (localSessionTracker == null) ? Collections.<Long>emptySet()
+ : localSessionTracker.localSessions();
+ }
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
index 0bf248a..9e71205 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -394,6 +395,13 @@ public class PrepRequestProcessorTest extends ClientBase {
public boolean isLocalSessionsEnabled() {
return false;
}
- }
+ public Set<Long> globalSessions() {
+ return Collections.emptySet();
+ }
+
+ public Set<Long> localSessions() {
+ return Collections.emptySet();
+ }
+ }
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerClientServerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerClientServerTest.java
index 3b9b92a..d6fd986 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerClientServerTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerClientServerTest.java
@@ -19,7 +19,6 @@
package org.apache.zookeeper.server.controller;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
public class ControllerClientServerTest extends ControllerTestBase {
@@ -41,10 +40,7 @@ public class ControllerClientServerTest extends
ControllerTestBase {
Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.CLOSECONNECTION));
}
- // TODO (hanm): this depends on the expiration session feature which
- // is not part of this patch. This test will be enabled once that
- // feature is upstreamed.
- @Ignore
+ @Test
public void verifyExpireSessionCommand() {
// Valid long session ids should be accepted.
Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.EXPIRESESSION,
"0x1234"));
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ZooKeeperServerControllerEndToEndTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ZooKeeperServerControllerEndToEndTest.java
index 2895afd..06b3626 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ZooKeeperServerControllerEndToEndTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ZooKeeperServerControllerEndToEndTest.java
@@ -84,7 +84,7 @@ public class ZooKeeperServerControllerEndToEndTest extends
ControllerTestBase {
watcher.waitForEvent();
}
- @Ignore
+ @Test
public void verifySessionExpiration() throws Exception {
// Setup: First connect to the server and wait for connected.
BlockingStateWatcher watcher = new
BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
@@ -104,7 +104,7 @@ public class ZooKeeperServerControllerEndToEndTest extends
ControllerTestBase {
watcher.waitForEvent();
}
- @Ignore
+ @Test
public void verifyGlobalSessionExpiration() throws Exception {
// Step 1: Connect.
BlockingStateWatcher stateWatcher = new
BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);