This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new c47ef90  ZOOKEEPER-3863: Do not track global sessions in 
ReadOnlyZooKeeperServer
c47ef90 is described below

commit c47ef905e077184bc5b7f555a3e2dfeb6dc046e1
Author: Jie Huang <[email protected]>
AuthorDate: Thu Sep 17 07:35:11 2020 +0200

    ZOOKEEPER-3863: Do not track global sessions in ReadOnlyZooKeeperServer
    
    Author: Jie Huang <[email protected]>
    
    Reviewers: Enrico Olivelli <[email protected]>, Michael Han 
<[email protected]>
    
    Closes #1380 from jhuan31/ZOOKEEPER-3863
---
 .../org/apache/zookeeper/server/ServerCnxn.java    |  3 +-
 .../apache/zookeeper/server/ZooKeeperServer.java   | 25 +++++++---
 .../server/quorum/ReadOnlyRequestProcessor.java    | 28 +++++++----
 .../server/quorum/ReadOnlyZooKeeperServer.java     | 47 ++++++++++++++++++
 .../apache/zookeeper/test/ReadOnlyModeTest.java    | 57 +++++++++++++++++++++-
 5 files changed, 142 insertions(+), 18 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 78dc8fa..0d17ce2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -101,6 +101,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
         CLOSE_CONNECTION_COMMAND("close_connection_command"),
         CLEAN_UP("clean_up"),
         CONNECTION_MODE_CHANGED("connection_mode_changed"),
+        RENEW_GLOBAL_SESSION_IN_RO_MODE("renew a global session in readonly 
mode"),
         // Below reasons are NettyServerCnxnFactory only
         CHANNEL_DISCONNECTED("channel disconnected"),
         CHANNEL_CLOSED_EXCEPTION("channel_closed_exception"),
@@ -298,7 +299,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
 
     protected ZooKeeperSaslServer zooKeeperSaslServer = null;
 
-    protected static class CloseRequestException extends IOException {
+    public static class CloseRequestException extends IOException {
 
         private static final long serialVersionUID = -7854505709816442681L;
         private DisconnectReason reason;
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 6543842..b016724 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -1412,13 +1412,13 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
                 connReq.getTimeOut(),
                 cnxn.getRemoteSocketAddress());
         } else {
-            long clientSessionId = connReq.getSessionId();
-                LOG.debug(
-                    "Client attempting to renew session: session = 0x{}, zxid 
= 0x{}, timeout = {}, address = {}",
-                    Long.toHexString(clientSessionId),
-                    Long.toHexString(connReq.getLastZxidSeen()),
-                    connReq.getTimeOut(),
-                    cnxn.getRemoteSocketAddress());
+            validateSession(cnxn, sessionId);
+            LOG.debug(
+                "Client attempting to renew session: session = 0x{}, zxid = 
0x{}, timeout = {}, address = {}",
+                Long.toHexString(sessionId),
+                Long.toHexString(connReq.getLastZxidSeen()),
+                connReq.getTimeOut(),
+                cnxn.getRemoteSocketAddress());
             if (serverCnxnFactory != null) {
                 serverCnxnFactory.closeSession(sessionId, 
ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
             }
@@ -1432,6 +1432,17 @@ public class ZooKeeperServer implements SessionExpirer, 
ServerStats.Provider {
         }
     }
 
+    /**
+     * Validate if a particular session can be reestablished.
+     *
+     * @param cnxn
+     * @param sessionId
+     */
+    protected void validateSession(ServerCnxn cnxn, long sessionId)
+            throws IOException {
+        // do nothing
+    }
+
     public boolean shouldThrottle(long outStandingCount) {
         int globalOutstandingLimit = getGlobalOutstandingLimit();
         if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < 
getInProcess()) {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
index c50dd53..3aec97f 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
@@ -86,16 +86,14 @@ public class ReadOnlyRequestProcessor extends 
ZooKeeperCriticalThread implements
                 case OpCode.setACL:
                 case OpCode.multi:
                 case OpCode.check:
-                    ReplyHeader hdr = new ReplyHeader(
-                        request.cxid,
-                        zks.getZKDatabase().getDataTreeLastProcessedZxid(),
-                        Code.NOTREADONLY.intValue());
-                    try {
-                        request.cnxn.sendResponse(hdr, null, null);
-                    } catch (IOException e) {
-                        LOG.error("IO exception while sending response", e);
-                    }
+                    sendErrorResponse(request);
                     continue;
+                case OpCode.closeSession:
+                case OpCode.createSession:
+                    if (!request.isLocalSession()) {
+                        sendErrorResponse(request);
+                        continue;
+                    }
                 }
 
                 // proceed to the next processor
@@ -109,6 +107,18 @@ public class ReadOnlyRequestProcessor extends 
ZooKeeperCriticalThread implements
         LOG.info("ReadOnlyRequestProcessor exited loop!");
     }
 
+    private void sendErrorResponse(Request request) {
+        ReplyHeader hdr = new ReplyHeader(
+                request.cxid,
+                zks.getZKDatabase().getDataTreeLastProcessedZxid(),
+                Code.NOTREADONLY.intValue());
+        try {
+            request.cnxn.sendResponse(hdr, null, null);
+        } catch (IOException e) {
+            LOG.error("IO exception while sending response", e);
+        }
+    }
+
     @Override
     public void processRequest(Request request) {
         if (!finished) {
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
index f8517eb..d2f6b39 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
@@ -18,14 +18,18 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerBean;
@@ -81,6 +85,49 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer 
{
     }
 
     @Override
+    public void createSessionTracker() {
+        sessionTracker = new LearnerSessionTracker(
+                this, getZKDatabase().getSessionWithTimeOuts(),
+                this.tickTime, self.getId(), self.areLocalSessionsEnabled(),
+                getZooKeeperServerListener());
+    }
+
+    @Override
+    protected void startSessionTracker() {
+        ((LearnerSessionTracker) sessionTracker).start();
+    }
+
+    @Override
+    protected void setLocalSessionFlag(Request si) {
+        switch (si.type) {
+            case OpCode.createSession:
+                if (self.areLocalSessionsEnabled()) {
+                    si.setLocalSession(true);
+                }
+                break;
+            case OpCode.closeSession:
+                if (((UpgradeableSessionTracker) 
sessionTracker).isLocalSession(si.sessionId)) {
+                    si.setLocalSession(true);
+                } else {
+                    LOG.warn("Submitting global closeSession request for 
session 0x{} in ReadOnly mode",
+                            Long.toHexString(si.sessionId));
+                }
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    protected void validateSession(ServerCnxn cnxn, long sessionId) throws 
IOException {
+        if (((LearnerSessionTracker) 
sessionTracker).isGlobalSession(sessionId)) {
+            String msg = "Refusing global session reconnection in RO mode " + 
cnxn.getRemoteSocketAddress();
+            LOG.info(msg);
+            throw new ServerCnxn.CloseRequestException(msg, 
ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE);
+        }
+    }
+
+    @Override
     protected void registerJMX() {
         // register with JMX
         try {
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
index 06c3734..4becd4a 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.io.ByteArrayOutputStream;
 import java.io.LineNumberReader;
 import java.io.StringReader;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Level;
@@ -57,7 +58,6 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @BeforeEach
     public void setUp() throws Exception {
         System.setProperty("readonlymode.enabled", "true");
-        qu.startQuorum();
     }
 
     @AfterEach
@@ -72,6 +72,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testMultiTransaction() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         CountdownWatcher watcher = new CountdownWatcher();
         ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, 
watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got 
connected
@@ -80,9 +83,12 @@ public class ReadOnlyModeTest extends ZKTestCase {
         final String node1 = "/tnode1";
         final String node2 = "/tnode2";
         zk.create(node1, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, 
CreateMode.PERSISTENT);
+        zk.close();
+        watcher.waitForDisconnected(CONNECTION_TIMEOUT);
 
         watcher.reset();
         qu.shutdown(2);
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, 
true);
         watcher.waitForConnected(CONNECTION_TIMEOUT);
         assertEquals(States.CONNECTEDREADONLY, zk.getState(), "Should be in 
r-o mode");
 
@@ -110,6 +116,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testReadOnlyClient() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         CountdownWatcher watcher = new CountdownWatcher();
         ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, 
watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got 
connected
@@ -161,6 +170,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testConnectionEvents() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         CountdownWatcher watcher = new CountdownWatcher();
         ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, 
watcher, true);
         boolean success = false;
@@ -202,6 +214,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testSessionEstablishment() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         qu.shutdown(2);
 
         CountdownWatcher watcher = new CountdownWatcher();
@@ -230,6 +245,43 @@ public class ReadOnlyModeTest extends ZKTestCase {
         zk.close();
     }
 
+    @Test(timeout = 90000)
+    public void testGlobalSessionInRO() throws Exception {
+        qu.startQuorum();
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, 
watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        LOG.info("global session created 0x{}", 
Long.toHexString(zk.getSessionId()));
+
+        watcher.reset();
+        qu.shutdown(2);
+        try {
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            fail("Should not be able to renew a global session");
+        } catch (TimeoutException e) {
+        }
+        zk.close();
+
+        watcher.reset();
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, 
true);
+        try {
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            fail("Should not be able to create a global session");
+        } catch (TimeoutException e) {
+        }
+        zk.close();
+
+        qu.getPeer(1).peer.enableLocalSessions(true);
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, 
true);
+        try {
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+        } catch (TimeoutException e) {
+            fail("Should be able to create a local session");
+        }
+        zk.close();
+    }
+
     /**
      * Ensures that client seeks for r/w servers while it's connected to r/o
      * server.
@@ -238,6 +290,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testSeekForRwServer() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         // setup the logger to capture all logs
         Layout layout = 
Logger.getRootLogger().getAppender("CONSOLE").getLayout();
         ByteArrayOutputStream os = new ByteArrayOutputStream();

Reply via email to