This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new a14bd54  [ZOOKEEPER-3690] Improving leader efficiency via not 
processing learner forwarded requests in commit processor
a14bd54 is described below

commit a14bd54f7112df96622c04fe6951bb25d6eddcf0
Author: Fangmin Lyu <fang...@apache.org>
AuthorDate: Fri May 15 09:10:25 2020 +0200

    [ZOOKEEPER-3690] Improving leader efficiency via not processing learner 
forwarded requests in commit processor
    
    Author: Fangmin Lyu <fang...@apache.org>
    
    Reviewers: Enrico Olivelli <eolive...@apache.org>, Michael Han 
<h...@apache.org>
    
    Closes #1223 from lvfangmin/ZOOKEEPER-3690
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  8 +++++
 .../org/apache/zookeeper/server/ServerMetrics.java |  3 ++
 .../server/quorum/ProposalRequestProcessor.java    | 26 +++++++++++++++-
 .../server/quorum/QuorumPeerMainTest.java          | 35 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 1 deletion(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md 
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 8d5f11b..6dbfa27 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -1148,6 +1148,14 @@ property, when available, is noted below.
     (Jave system property only: **learner.closeSocketAsync**)
     When enabled, a learner will close the quorum socket asynchronously. This 
is useful for TLS connections where closing a socket might take a long time, 
block the shutdown process, potentially delay a new leader election, and leave 
the quorum unavailabe. Closing the socket asynchronously avoids blocking the 
shutdown process despite the long socket closing time and a new leader election 
can be started while the socket being closed. The default is false.
 
+* *forward_learner_requests_to_commit_processor_disabled*
+    (Jave system property: 
**zookeeper.forward_learner_requests_to_commit_processor_disabled**)
+    When this property is set, the requests from learners won't be enqueued to
+    CommitProcessor queue, which will help save the resources and GC time on 
+    leader.
+
+    The default value is false.
+
 
 <a name="sc_clusterOptions"></a>
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 36a65df..3962bb9 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -241,6 +241,8 @@ public final class ServerMetrics {
 
         SOCKET_CLOSING_TIME = metricsContext.getSummary("socket_closing_time", 
DetailLevel.BASIC);
 
+        REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR = metricsContext.getCounter(
+                "requests_not_forwarded_to_commit_processor");
     }
 
     /**
@@ -465,6 +467,7 @@ public final class ServerMetrics {
 
     public final Summary SOCKET_CLOSING_TIME;
 
+    public final Counter REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR;
 
     private final MetricsProvider metricsProvider;
 
diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
index c4b5a6e..a3f1b29 100644
--- 
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
+++ 
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
@@ -20,6 +20,7 @@ package org.apache.zookeeper.server.quorum;
 
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.quorum.Leader.XidRolloverException;
 import org.slf4j.Logger;
@@ -39,11 +40,22 @@ public class ProposalRequestProcessor implements 
RequestProcessor {
 
     SyncRequestProcessor syncProcessor;
 
+    // If this property is set, requests from Learners won't be forwarded
+    // to the CommitProcessor in order to save resources
+    public static final String 
FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED =
+          "zookeeper.forward_learner_requests_to_commit_processor_disabled";
+    private final boolean forwardLearnerRequestsToCommitProcessorDisabled;
+
     public ProposalRequestProcessor(LeaderZooKeeperServer zks, 
RequestProcessor nextProcessor) {
         this.zks = zks;
         this.nextProcessor = nextProcessor;
         AckRequestProcessor ackProcessor = new 
AckRequestProcessor(zks.getLeader());
         syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
+
+        forwardLearnerRequestsToCommitProcessorDisabled = Boolean.getBoolean(
+                FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED);
+        LOG.info("{} = {}", 
FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED,
+                forwardLearnerRequestsToCommitProcessorDisabled);
     }
 
     /**
@@ -70,7 +82,9 @@ public class ProposalRequestProcessor implements 
RequestProcessor {
         if (request instanceof LearnerSyncRequest) {
             zks.getLeader().processSync((LearnerSyncRequest) request);
         } else {
-            nextProcessor.processRequest(request);
+            if (shouldForwardToNextProcessor(request)) {
+                nextProcessor.processRequest(request);
+            }
             if (request.getHdr() != null) {
                 // We need to sync and get consensus on any transactions
                 try {
@@ -89,4 +103,14 @@ public class ProposalRequestProcessor implements 
RequestProcessor {
         syncProcessor.shutdown();
     }
 
+    private boolean shouldForwardToNextProcessor(Request request) {
+        if (!forwardLearnerRequestsToCommitProcessorDisabled) {
+            return true;
+        }
+        if (request.getOwner() instanceof LearnerHandler) {
+            
ServerMetrics.getMetrics().REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR.add(1);
+            return false;
+        }
+        return true;
+    }
 }
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
index beb3bb9..487507b 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
@@ -62,6 +62,7 @@ import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.common.X509Exception;
 import org.apache.zookeeper.metrics.BaseTestMetricsProvider;
 import org.apache.zookeeper.metrics.impl.NullMetricsProvider;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.test.ClientBase;
@@ -1620,6 +1621,40 @@ public class QuorumPeerMainTest extends 
QuorumPeerTestBase {
         assertTrue("complains about metrics provider 
MetricsProviderLifeCycleException", found);
     }
 
+    /**
+     * Test the behavior to skip processing the learner forwarded requests in
+     * Leader's CommitProcessor.
+     */
+    @Test
+    public void testLearnerRequestForwardBehavior() throws Exception {
+        
System.setProperty(ProposalRequestProcessor.FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED,
 "true");
+
+        try {
+            // 1. set up an ensemble with 3 servers
+            final int numServers = 3;
+            servers = LaunchServers(numServers);
+            int leaderId =  servers.findLeader();
+
+            int followerA = (leaderId + 1) % numServers;
+            waitForOne(servers.zk[followerA], States.CONNECTED);
+
+            // 2. reset all metrics
+            ServerMetrics.getMetrics().resetAll();
+
+            // 3. issue a request
+            final String node = "/testLearnerRequestForwardBehavior";
+            servers.zk[followerA].create(node, new byte[0], 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+            assertNotNull("node " + node + " should exist",
+                    
servers.zk[followerA].exists("/testLearnerRequestForwardBehavior", false));
+
+            assertEquals(1L, 
ServerMetrics.getMetrics().REQUESTS_NOT_FORWARDED_TO_COMMIT_PROCESSOR.get());
+        } finally {
+            //clean up
+            
System.setProperty(ProposalRequestProcessor.FORWARD_LEARNER_REQUESTS_TO_COMMIT_PROCESSOR_DISABLED,
 "false");
+        }
+    }
+
     static class Context {
 
         boolean quitFollowing = false;

Reply via email to