This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new c47ef90 ZOOKEEPER-3863: Do not track global sessions in
ReadOnlyZooKeeperServer
c47ef90 is described below
commit c47ef905e077184bc5b7f555a3e2dfeb6dc046e1
Author: Jie Huang <[email protected]>
AuthorDate: Thu Sep 17 07:35:11 2020 +0200
ZOOKEEPER-3863: Do not track global sessions in ReadOnlyZooKeeperServer
Author: Jie Huang <[email protected]>
Reviewers: Enrico Olivelli <[email protected]>, Michael Han
<[email protected]>
Closes #1380 from jhuan31/ZOOKEEPER-3863
---
.../org/apache/zookeeper/server/ServerCnxn.java | 3 +-
.../apache/zookeeper/server/ZooKeeperServer.java | 25 +++++++---
.../server/quorum/ReadOnlyRequestProcessor.java | 28 +++++++----
.../server/quorum/ReadOnlyZooKeeperServer.java | 47 ++++++++++++++++++
.../apache/zookeeper/test/ReadOnlyModeTest.java | 57 +++++++++++++++++++++-
5 files changed, 142 insertions(+), 18 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 78dc8fa..0d17ce2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -101,6 +101,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
CLOSE_CONNECTION_COMMAND("close_connection_command"),
CLEAN_UP("clean_up"),
CONNECTION_MODE_CHANGED("connection_mode_changed"),
+ RENEW_GLOBAL_SESSION_IN_RO_MODE("renew a global session in readonly
mode"),
// Below reasons are NettyServerCnxnFactory only
CHANNEL_DISCONNECTED("channel disconnected"),
CHANNEL_CLOSED_EXCEPTION("channel_closed_exception"),
@@ -298,7 +299,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
protected ZooKeeperSaslServer zooKeeperSaslServer = null;
- protected static class CloseRequestException extends IOException {
+ public static class CloseRequestException extends IOException {
private static final long serialVersionUID = -7854505709816442681L;
private DisconnectReason reason;
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 6543842..b016724 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1412,13 +1412,13 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
- long clientSessionId = connReq.getSessionId();
- LOG.debug(
- "Client attempting to renew session: session = 0x{}, zxid
= 0x{}, timeout = {}, address = {}",
- Long.toHexString(clientSessionId),
- Long.toHexString(connReq.getLastZxidSeen()),
- connReq.getTimeOut(),
- cnxn.getRemoteSocketAddress());
+ validateSession(cnxn, sessionId);
+ LOG.debug(
+ "Client attempting to renew session: session = 0x{}, zxid =
0x{}, timeout = {}, address = {}",
+ Long.toHexString(sessionId),
+ Long.toHexString(connReq.getLastZxidSeen()),
+ connReq.getTimeOut(),
+ cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId,
ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
@@ -1432,6 +1432,17 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
}
}
+ /**
+ * Validate if a particular session can be reestablished.
+ *
+ * @param cnxn
+ * @param sessionId
+ */
+ protected void validateSession(ServerCnxn cnxn, long sessionId)
+ throws IOException {
+ // do nothing
+ }
+
public boolean shouldThrottle(long outStandingCount) {
int globalOutstandingLimit = getGlobalOutstandingLimit();
if (globalOutstandingLimit < getInflight() || globalOutstandingLimit <
getInProcess()) {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
index c50dd53..3aec97f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
@@ -86,16 +86,14 @@ public class ReadOnlyRequestProcessor extends
ZooKeeperCriticalThread implements
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
- ReplyHeader hdr = new ReplyHeader(
- request.cxid,
- zks.getZKDatabase().getDataTreeLastProcessedZxid(),
- Code.NOTREADONLY.intValue());
- try {
- request.cnxn.sendResponse(hdr, null, null);
- } catch (IOException e) {
- LOG.error("IO exception while sending response", e);
- }
+ sendErrorResponse(request);
continue;
+ case OpCode.closeSession:
+ case OpCode.createSession:
+ if (!request.isLocalSession()) {
+ sendErrorResponse(request);
+ continue;
+ }
}
// proceed to the next processor
@@ -109,6 +107,18 @@ public class ReadOnlyRequestProcessor extends
ZooKeeperCriticalThread implements
LOG.info("ReadOnlyRequestProcessor exited loop!");
}
+ private void sendErrorResponse(Request request) {
+ ReplyHeader hdr = new ReplyHeader(
+ request.cxid,
+ zks.getZKDatabase().getDataTreeLastProcessedZxid(),
+ Code.NOTREADONLY.intValue());
+ try {
+ request.cnxn.sendResponse(hdr, null, null);
+ } catch (IOException e) {
+ LOG.error("IO exception while sending response", e);
+ }
+ }
+
@Override
public void processRequest(Request request) {
if (!finished) {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
index f8517eb..d2f6b39 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
@@ -18,14 +18,18 @@
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
import java.io.PrintWriter;
import java.util.Objects;
import java.util.stream.Collectors;
+import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.DataTreeBean;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerBean;
@@ -81,6 +85,49 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer
{
}
@Override
+ public void createSessionTracker() {
+ sessionTracker = new LearnerSessionTracker(
+ this, getZKDatabase().getSessionWithTimeOuts(),
+ this.tickTime, self.getId(), self.areLocalSessionsEnabled(),
+ getZooKeeperServerListener());
+ }
+
+ @Override
+ protected void startSessionTracker() {
+ ((LearnerSessionTracker) sessionTracker).start();
+ }
+
+ @Override
+ protected void setLocalSessionFlag(Request si) {
+ switch (si.type) {
+ case OpCode.createSession:
+ if (self.areLocalSessionsEnabled()) {
+ si.setLocalSession(true);
+ }
+ break;
+ case OpCode.closeSession:
+ if (((UpgradeableSessionTracker)
sessionTracker).isLocalSession(si.sessionId)) {
+ si.setLocalSession(true);
+ } else {
+ LOG.warn("Submitting global closeSession request for
session 0x{} in ReadOnly mode",
+ Long.toHexString(si.sessionId));
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
+ protected void validateSession(ServerCnxn cnxn, long sessionId) throws
IOException {
+ if (((LearnerSessionTracker)
sessionTracker).isGlobalSession(sessionId)) {
+ String msg = "Refusing global session reconnection in RO mode " +
cnxn.getRemoteSocketAddress();
+ LOG.info(msg);
+ throw new ServerCnxn.CloseRequestException(msg,
ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE);
+ }
+ }
+
+ @Override
protected void registerJMX() {
// register with JMX
try {
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
index 06c3734..4becd4a 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.io.ByteArrayOutputStream;
import java.io.LineNumberReader;
import java.io.StringReader;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.log4j.Layout;
import org.apache.log4j.Level;
@@ -57,7 +58,6 @@ public class ReadOnlyModeTest extends ZKTestCase {
@BeforeEach
public void setUp() throws Exception {
System.setProperty("readonlymode.enabled", "true");
- qu.startQuorum();
}
@AfterEach
@@ -72,6 +72,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
@Test
@Timeout(value = 90)
public void testMultiTransaction() throws Exception {
+ qu.enableLocalSession(true);
+ qu.startQuorum();
+
CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
watcher, true);
watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got
connected
@@ -80,9 +83,12 @@ public class ReadOnlyModeTest extends ZKTestCase {
final String node1 = "/tnode1";
final String node2 = "/tnode2";
zk.create(node1, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
+ zk.close();
+ watcher.waitForDisconnected(CONNECTION_TIMEOUT);
watcher.reset();
qu.shutdown(2);
+ zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher,
true);
watcher.waitForConnected(CONNECTION_TIMEOUT);
assertEquals(States.CONNECTEDREADONLY, zk.getState(), "Should be in
r-o mode");
@@ -110,6 +116,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
@Test
@Timeout(value = 90)
public void testReadOnlyClient() throws Exception {
+ qu.enableLocalSession(true);
+ qu.startQuorum();
+
CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
watcher, true);
watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got
connected
@@ -161,6 +170,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
@Test
@Timeout(value = 90)
public void testConnectionEvents() throws Exception {
+ qu.enableLocalSession(true);
+ qu.startQuorum();
+
CountdownWatcher watcher = new CountdownWatcher();
ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
watcher, true);
boolean success = false;
@@ -202,6 +214,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
@Test
@Timeout(value = 90)
public void testSessionEstablishment() throws Exception {
+ qu.enableLocalSession(true);
+ qu.startQuorum();
+
qu.shutdown(2);
CountdownWatcher watcher = new CountdownWatcher();
@@ -230,6 +245,43 @@ public class ReadOnlyModeTest extends ZKTestCase {
zk.close();
}
+ @Test(timeout = 90000)
+ public void testGlobalSessionInRO() throws Exception {
+ qu.startQuorum();
+
+ CountdownWatcher watcher = new CountdownWatcher();
+ ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
watcher, true);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ LOG.info("global session created 0x{}",
Long.toHexString(zk.getSessionId()));
+
+ watcher.reset();
+ qu.shutdown(2);
+ try {
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ fail("Should not be able to renew a global session");
+ } catch (TimeoutException e) {
+ }
+ zk.close();
+
+ watcher.reset();
+ zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher,
true);
+ try {
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ fail("Should not be able to create a global session");
+ } catch (TimeoutException e) {
+ }
+ zk.close();
+
+ qu.getPeer(1).peer.enableLocalSessions(true);
+ zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher,
true);
+ try {
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ } catch (TimeoutException e) {
+ fail("Should be able to create a local session");
+ }
+ zk.close();
+ }
+
/**
* Ensures that client seeks for r/w servers while it's connected to r/o
* server.
@@ -238,6 +290,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
@Test
@Timeout(value = 90)
public void testSeekForRwServer() throws Exception {
+ qu.enableLocalSession(true);
+ qu.startQuorum();
+
// setup the logger to capture all logs
Layout layout =
Logger.getRootLogger().getAppender("CONSOLE").getLayout();
ByteArrayOutputStream os = new ByteArrayOutputStream();