This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 6688285 ZOOKEEPER-3242: Add server side connecting throttling
6688285 is described below
commit 668828503389f3e542b846019bace8b9842fa428
Author: Jie Huang <[email protected]>
AuthorDate: Wed Jan 23 14:00:13 2019 +0100
ZOOKEEPER-3242: Add server side connecting throttling
Author: Jie Huang <[email protected]>
Reviewers: [email protected], [email protected]
Closes #769 from jhuan31/ZOOKEEPER-3242 and squashes the following commits:
c3ec81f4e [Jie Huang] refactoring
86cad39c4 [Jie Huang] Use a mock random number generator to make the unit
test flaky-proof
a278504d4 [Jie Huang] Add unit tests for server-side connection throttling
fd966502b [Jie Huang] update doc for server-side connection throttling
2f1ed0b87 [Jie Huang] Fix FindBugs Warnings
a48b0fcb1 [Jie Huang] ZOOKEEPER-3242: Add server side connecting throttling
---
.../src/main/resources/markdown/zookeeperAdmin.md | 68 ++++++
.../org/apache/zookeeper/server/BlueThrottle.java | 268 +++++++++++++++++++++
.../zookeeper/server/ClientCnxnLimitException.java | 30 +++
.../org/apache/zookeeper/server/NIOServerCnxn.java | 12 +-
.../zookeeper/server/NIOServerCnxnFactory.java | 1 +
.../apache/zookeeper/server/NettyServerCnxn.java | 7 +
.../zookeeper/server/NettyServerCnxnFactory.java | 1 +
.../org/apache/zookeeper/server/ServerMetrics.java | 4 +
.../apache/zookeeper/server/ZooKeeperServer.java | 24 +-
.../zookeeper/server/ZooKeeperServerBean.java | 73 +++++-
.../zookeeper/server/ZooKeeperServerMXBean.java | 22 ++
.../apache/zookeeper/server/admin/Commands.java | 1 +
.../apache/zookeeper/server/BlueThrottleTest.java | 157 ++++++++++++
.../zookeeper/server/admin/CommandsTest.java | 3 +-
14 files changed, 666 insertions(+), 5 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 95787b7..57df547 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -738,6 +738,74 @@ property, when available, is noted below.
strategy from the configured minimum (fastleader.minNotificationInterval)
and the configured maximum (this) for long elections.
+* *connectionMaxTokens* :
+ (Java system property: **zookeeper.connection_throttle_tokens**)
+ **New in 3.6.0:**
+ This is one of the parameters to tune the server-side connection throttler,
+ which is a token-based rate limiting mechanism with optional probabilistic
+ dropping.
+ This parameter defines the maximum number of tokens in the token-bucket.
+ When set to 0, throttling is disabled. Default is 0.
+
+* *connectionTokenFillTime* :
+ (Java system property: **zookeeper.connection_throttle_fill_time**)
+ **New in 3.6.0:**
+ This is one of the parameters to tune the server-side connection throttler,
+ which is a token-based rate limiting mechanism with optional probabilistic
+ dropping.
+ This parameter defines the interval in milliseconds when the token bucket
is re-filled with
+ *connectionTokenFillCount* tokens. Default is 1.
+
+* *connectionTokenFillCount* :
+ (Java system property: **zookeeper.connection_throttle_fill_count**)
+ **New in 3.6.0:**
+ This is one of the parameters to tune the server-side connection throttler,
+ which is a token-based rate limiting mechanism with optional probabilistic
+ dropping.
+ This parameter defines the number of tokens to add to the token bucket
every
+ *connectionTokenFillTime* milliseconds. Default is 1.
+
+* *connectionFreezeTime* :
+ (Java system property: **zookeeper.connection_throttle_freeze_time**)
+ **New in 3.6.0:**
+ This is one of the parameters to tune the server-side connection throttler,
+ which is a token-based rate limiting mechanism with optional probabilistic
+ dropping.
+ This parameter defines the interval in milliseconds when the dropping
+ probability is adjusted. When set to -1, probabilistic dropping is
disabled.
+ Default is -1.
+
+* *connectionDropIncrease* :
+ (Java system property: **zookeeper.connection_throttle_drop_increase**)
+ **New in 3.6.0:**
+ This is one of the parameters to tune the server-side connection throttler,
+ which is a token-based rate limiting mechanism with optional probabilistic
+ dropping.
+ This parameter defines the dropping probability to increase. The throttler
+ checks every *connectionFreezeTime* milliseconds and if the token bucket is
+ empty, the dropping probability will be increased by
*connectionDropIncrease*.
+ The default is 0.02.
+
+* *connectionDropDecrease* :
+ (Java system property: **zookeeper.connection_throttle_drop_decrease**)
+ **New in 3.6.0:**
+ This is one of the parameters to tune the server-side connection throttler,
+ which is a token-based rate limiting mechanism with optional probabilistic
+ dropping.
+ This parameter defines the dropping probability to decrease. The throttler
+ checks every *connectionFreezeTime* milliseconds and if the token bucket
has
+ more tokens than a threshold, the dropping probability will be decreased by
+ *connectionDropDecrease*. The threshold is *connectionMaxTokens* \*
+ *connectionDecreaseRatio*. The default is 0.002.
+
+* *connectionDecreaseRatio* :
+ (Java system property: **zookeeper.connection_throttle_decrease_ratio**)
+ **New in 3.6.0:**
+ This is one of the parameters to tune the server-side connection throttler,
+ which is a token-based rate limiting mechanism with optional probabilistic
+ dropping. This parameter defines the threshold to decrease the dropping
+ probability. The default is 0.
+
<a name="sc_clusterOptions"></a>
#### Cluster Options
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
new file mode 100644
index 0000000..1aa9e5b
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/BlueThrottle.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Random;
+
+import org.apache.zookeeper.common.Time;
+
+/**
+ * Implements a token-bucket based rate limiting mechanism with optional
+ * probabilistic dropping inspired by the BLUE queue management algorithm [1].
+ *
+ * The throttle provides the {@link #checkLimit(int)} method which provides
+ * a binary yes/no decision.
+ *
+ * The core token bucket algorithm starts with an initial set of tokens based
+ * on the <code>maxTokens</code> setting. Tokens are dispensed each
+ * {@link #checkLimit(int)} call, which fails if there are not enough tokens to
+ * satisfy a given request.
+ *
+ * The token bucket refills over time, providing <code>fillCount</code> tokens
+ * every <code>fillTime</code> milliseconds, capping at <code>maxTokens</code>.
+ *
+ * This design allows the throttle to allow short bursts to pass, while still
+ * capping the total number of requests per time interval.
+ *
+ * One issue with a pure token bucket approach for something like request or
+ * connection throttling is that the wall clock arrival time of requests
affects
+ * the probability of a request being allowed to pass or not. Under constant
+ * load this can lead to request starvation for requests that constantly arrive
+ * later than the majority.
+ *
+ * In an attempt to combat this, this throttle can also provide probabilistic
+ * dropping. This is enabled anytime <code>freezeTime</code> is set to a value
+ * other than <code>-1</code>.
+ *
+ * The probabilistic algorithm starts with an initial drop probability of 0,
and
+ * adjusts this probability roughly every <code>freezeTime</code> milliseconds.
+ * The first request after <code>freezeTime</code>, the algorithm checks the
+ * token bucket. If the token bucket is empty, the drop probability is
increased
+ * by <code>dropIncrease</code> up to a maximum of <code>1</code>. Otherwise,
if
+ * the bucket has a token deficit less than <code>decreasePoint *
maxTokens</code>,
+ * the probability is decreased by <code>dropDecrease</code>.
+ *
+ * Given a call to {@link #checkLimit(int)}, requests are first dropped
randomly
+ * based on the current drop probability, and only surviving requests are then
+ * checked against the token bucket.
+ *
+ * When under constant load, the probabilistic algorithm will adapt to a drop
+ * frequency that should keep requests within the token limit. When load drops,
+ * the drop probability will decrease, eventually returning to zero if
possible.
+ *
+ * [1] "BLUE: A New Class of Active Queue Management Algorithms"
+ **/
+
+public class BlueThrottle {
+ private int maxTokens;
+ private int fillTime;
+ private int fillCount;
+ private int tokens;
+ private long lastTime;
+
+ private int freezeTime;
+ private long lastFreeze;
+ private double dropIncrease;
+ private double dropDecrease;
+ private double decreasePoint;
+ private double drop;
+
+ Random rng;
+
+ public static final String CONNECTION_THROTTLE_TOKENS =
"zookeeper.connection_throttle_tokens";
+ public static final int DEFAULT_CONNECTION_THROTTLE_TOKENS;
+
+ public static final String CONNECTION_THROTTLE_FILL_TIME =
"zookeeper.connection_throttle_fill_time";
+ public static final int DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
+
+ public static final String CONNECTION_THROTTLE_FILL_COUNT =
"zookeeper.connection_throttle_fill_count";
+ public static final int DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
+
+ public static final String CONNECTION_THROTTLE_FREEZE_TIME =
"zookeeper.connection_throttle_freeze_time";
+ public static final int DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
+
+ public static final String CONNECTION_THROTTLE_DROP_INCREASE =
"zookeeper.connection_throttle_drop_increase";
+ public static final double DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
+
+ public static final String CONNECTION_THROTTLE_DROP_DECREASE =
"zookeeper.connection_throttle_drop_decrease";
+ public static final double DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
+
+ public static final String CONNECTION_THROTTLE_DECREASE_RATIO =
"zookeeper.connection_throttle_decrease_ratio";
+ public static final double DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+
+
+ static {
+ DEFAULT_CONNECTION_THROTTLE_TOKENS =
Integer.getInteger(CONNECTION_THROTTLE_TOKENS, 0);
+ DEFAULT_CONNECTION_THROTTLE_FILL_TIME =
Integer.getInteger(CONNECTION_THROTTLE_FILL_TIME, 1);
+ DEFAULT_CONNECTION_THROTTLE_FILL_COUNT =
Integer.getInteger(CONNECTION_THROTTLE_FILL_COUNT, 1);
+
+ DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME =
Integer.getInteger(CONNECTION_THROTTLE_FREEZE_TIME, -1);
+ DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE =
getDoubleProp(CONNECTION_THROTTLE_DROP_INCREASE, 0.02);
+ DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE =
getDoubleProp(CONNECTION_THROTTLE_DROP_DECREASE, 0.002);
+ DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO =
getDoubleProp(CONNECTION_THROTTLE_DECREASE_RATIO, 0);
+ }
+
+ /* Varation of Integer.getInteger for real number properties */
+ private static double getDoubleProp(String name, double def) {
+ String val = System.getProperty(name);
+ if(val != null) {
+ return Double.parseDouble(val);
+ }
+ else {
+ return def;
+ }
+ }
+
+
+ public BlueThrottle() {
+ // Disable throttling by default (maxTokens = 0)
+ this.maxTokens = DEFAULT_CONNECTION_THROTTLE_TOKENS;
+ this.fillTime = DEFAULT_CONNECTION_THROTTLE_FILL_TIME;
+ this.fillCount = DEFAULT_CONNECTION_THROTTLE_FILL_COUNT;
+ this.tokens = maxTokens;
+ this.lastTime = Time.currentElapsedTime();
+
+ // Disable BLUE throttling by default (freezeTime = -1)
+ this.freezeTime = DEFAULT_CONNECTION_THROTTLE_FREEZE_TIME;
+ this.lastFreeze = Time.currentElapsedTime();
+ this.dropIncrease = DEFAULT_CONNECTION_THROTTLE_DROP_INCREASE;
+ this.dropDecrease = DEFAULT_CONNECTION_THROTTLE_DROP_DECREASE;
+ this.decreasePoint = DEFAULT_CONNECTION_THROTTLE_DECREASE_RATIO;
+ this.drop = 0;
+
+ this.rng = new Random();
+ }
+
+ public synchronized void setMaxTokens(int max) {
+ int deficit = maxTokens - tokens;
+ maxTokens = max;
+ tokens = max - deficit;
+ }
+
+ public synchronized void setFillTime(int time) {
+ fillTime = time;
+ }
+
+ public synchronized void setFillCount(int count) {
+ fillCount = count;
+ }
+
+ public synchronized void setFreezeTime(int time) {
+ freezeTime = time;
+ }
+
+ public synchronized void setDropIncrease(double increase) {
+ dropIncrease = increase;
+ }
+
+ public synchronized void setDropDecrease(double decrease) {
+ dropDecrease = decrease;
+ }
+
+ public synchronized void setDecreasePoint(double ratio) {
+ decreasePoint = ratio;
+ }
+
+ public synchronized int getMaxTokens() {
+ return maxTokens;
+ }
+
+ public synchronized int getFillTime() {
+ return fillTime;
+ }
+
+ public synchronized int getFillCount() {
+ return fillCount;
+ }
+
+ public synchronized int getFreezeTime() {
+ return freezeTime;
+ }
+
+ public synchronized double getDropIncrease() {
+ return dropIncrease;
+ }
+
+ public synchronized double getDropDecrease() {
+ return dropDecrease;
+ }
+
+ public synchronized double getDecreasePoint() {
+ return decreasePoint;
+ }
+
+ public synchronized double getDropChance() {
+ return drop;
+ }
+
+ public synchronized int getDeficit() {
+ return maxTokens - tokens;
+ }
+
+ public synchronized boolean checkLimit(int need) {
+ // A maxTokens setting of zero disables throttling
+ if (maxTokens == 0)
+ return true;
+
+ long now = Time.currentElapsedTime();
+ long diff = now - lastTime;
+
+ if (diff > fillTime) {
+ int refill = (int)(diff * fillCount / fillTime);
+ tokens = Math.min(tokens + refill, maxTokens);
+ lastTime = now;
+ }
+
+ // A freeze time of -1 disables BLUE randomized throttling
+ if(freezeTime != -1) {
+ if(!checkBlue(now)) {
+ return false;
+ }
+ }
+
+ if (tokens < need) {
+ return false;
+ }
+
+ tokens -= need;
+ return true;
+ }
+
+ public synchronized boolean checkBlue(long now) {
+ int length = maxTokens - tokens;
+ int limit = maxTokens;
+ long diff = now - lastFreeze;
+ long threshold = Math.round(maxTokens * decreasePoint);
+
+ if (diff > freezeTime) {
+ if((length == limit) && (drop < 1)) {
+ drop = Math.min(drop + dropIncrease, 1);
+ }
+ else if ((length <= threshold) && (drop > 0)) {
+ drop = Math.max(drop - dropDecrease, 0);
+ }
+ lastFreeze = now;
+ }
+
+ if (rng.nextDouble() < drop) {
+ return false;
+ }
+ return true;
+ }
+};
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java
new file mode 100644
index 0000000..38f8995
--- /dev/null
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ClientCnxnLimitException.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+/**
+ * Indicates that the number of client connections has exceeded some limit.
+ * @see org.apache.zookeeper.server.ClientCnxnLimit#checkLimit()
+ * @see org.apache.zookeeper.server.ClientCnxnLimit#checkLimit(int)
+ */
+public class ClientCnxnLimitException extends Exception {
+ public ClientCnxnLimitException() {
+ super("Connection throttle rejected connection");
+ }
+}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index c2ab784..f7e382f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -151,7 +151,7 @@ public class NIOServerCnxn extends ServerCnxn {
}
/** Read the request payload (everything following the length prefix) */
- private void readPayload() throws IOException, InterruptedException {
+ private void readPayload() throws IOException, InterruptedException,
ClientCnxnLimitException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
@@ -360,6 +360,14 @@ public class NIOServerCnxn extends ServerCnxn {
LOG.warn(e.getMessage());
// expecting close to log session closure
close();
+ } catch (ClientCnxnLimitException e) {
+ // Common case exception, print at debug level
+ ServerMetrics.CONNECTION_REJECTED.add(1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception causing close of session 0x"
+ + Long.toHexString(sessionId) + ": " +
e.getMessage());
+ }
+ close();
} catch (IOException e) {
LOG.warn("Exception causing close of session 0x"
+ Long.toHexString(sessionId) + ": " + e.getMessage());
@@ -407,7 +415,7 @@ public class NIOServerCnxn extends ServerCnxn {
}
}
- private void readConnectRequest() throws IOException, InterruptedException
{
+ private void readConnectRequest() throws IOException,
InterruptedException, ClientCnxnLimitException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
index 4e7e5db..090ee7b 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java
@@ -310,6 +310,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory
{
acceptErrorLogger.flush();
} catch (IOException e) {
// accept, maxClientCnxns, configureBlocking
+ ServerMetrics.CONNECTION_REJECTED.add(1);
acceptErrorLogger.rateLimitLog(
"Error accepting new connection: " + e.getMessage());
fastCloseSock(sc);
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index b6bb343..8b4f70f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -486,6 +486,13 @@ public class NettyServerCnxn extends ServerCnxn {
} catch(IOException e) {
LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
close();
+ } catch(ClientCnxnLimitException e) {
+ // Common case exception, print at debug level
+ ServerMetrics.CONNECTION_REJECTED.add(1);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing connection to " + getRemoteSocketAddress(),
e);
+ }
+ close();
}
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index d96f56d..e0d55a4 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -108,6 +108,7 @@ public class NettyServerCnxnFactory extends
ServerCnxnFactory {
InetAddress addr = ((InetSocketAddress) channel.remoteAddress())
.getAddress();
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >=
maxClientCnxns) {
+ ServerMetrics.CONNECTION_REJECTED.add(1);
LOG.warn("Too many connections from {} - max is {}", addr,
maxClientCnxns);
channel.close();
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 3420b88..dcc04a3 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -67,6 +67,10 @@ public enum ServerMetrics {
SNAP_COUNT(new SimpleCounter("snap_count")),
COMMIT_COUNT(new SimpleCounter("commit_count")),
CONNECTION_REQUEST_COUNT(new SimpleCounter("connection_request_count")),
+ // Connection throttling related
+ CONNECTION_TOKEN_DEFICIT(new AvgMinMaxCounter("connection_token_deficit")),
+ CONNECTION_REJECTED(new SimpleCounter("connection_rejected")),
+
BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count")),
RESPONSE_PACKET_CACHE_HITS(new
SimpleCounter("response_packet_cache_hits")),
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 833c79b..ee0e4c2 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -164,6 +164,9 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
LOG.info(INT_BUFFER_STARTING_SIZE_BYTES + " = " +
intBufferStartingSizeBytes);
}
+ // Connection throttling
+ private BlueThrottle connThrottle;
+
void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
}
@@ -196,7 +199,11 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
setMinSessionTimeout(minSessionTimeout);
setMaxSessionTimeout(maxSessionTimeout);
listener = new ZooKeeperServerListenerImpl(this);
+
readResponseCache = new ResponseCache();
+
+ connThrottle = new BlueThrottle();
+
LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
@@ -219,6 +226,10 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
return serverStats;
}
+ public BlueThrottle connThrottle() {
+ return connThrottle;
+ }
+
public void dumpConf(PrintWriter pwriter) {
pwriter.print("clientPort=");
pwriter.println(getClientPort());
@@ -1043,7 +1054,18 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
return zkDb.getEphemerals();
}
- public void processConnectRequest(ServerCnxn cnxn, ByteBuffer
incomingBuffer) throws IOException {
+ public double getConnectionDropChance() {
+ return connThrottle.getDropChance();
+ }
+
+ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer
incomingBuffer)
+ throws IOException, ClientCnxnLimitException {
+
+ if (connThrottle.checkLimit(1) == false) {
+ throw new ClientCnxnLimitException();
+ }
+ ServerMetrics.CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
+
BinaryInputArchive bia = BinaryInputArchive.getArchive(new
ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index deae98d..b8cf706 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -167,7 +167,7 @@ public class ZooKeeperServerBean implements
ZooKeeperServerMXBean, ZKMBeanInfo {
public String getSecureClientAddress() {
if (zks.secureServerCnxnFactory != null) {
return String.format("%s:%d", zks.secureServerCnxnFactory
- .getLocalAddress().getHostString(),
+ .getLocalAddress().getHostString(),
zks.secureServerCnxnFactory.getLocalPort());
}
return "";
@@ -207,4 +207,75 @@ public class ZooKeeperServerBean implements
ZooKeeperServerMXBean, ZKMBeanInfo {
public void setResponseCachingEnabled(boolean isEnabled) {
zks.setResponseCachingEnabled(isEnabled);
}
+
+ // Connection throttling settings
+ ///////////////////////////////////////////////////////////////////////////
+
+ public int getConnectionMaxTokens() {
+ return zks.connThrottle().getMaxTokens();
+ }
+
+ public void setConnectionMaxTokens(int val) {
+ zks.connThrottle().setMaxTokens(val);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public int getConnectionTokenFillTime() {
+ return zks.connThrottle().getFillTime();
+ }
+
+ public void setConnectionTokenFillTime(int val) {
+ zks.connThrottle().setFillTime(val);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public int getConnectionTokenFillCount() {
+ return zks.connThrottle().getFillCount();
+ }
+
+ public void setConnectionTokenFillCount(int val) {
+ zks.connThrottle().setFillCount(val);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public int getConnectionFreezeTime() {
+ return zks.connThrottle().getFreezeTime();
+ }
+
+ public void setConnectionFreezeTime(int val) {
+ zks.connThrottle().setFreezeTime(val);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public double getConnectionDropIncrease() {
+ return zks.connThrottle().getDropIncrease();
+ }
+
+ public void setConnectionDropIncrease(double val) {
+ zks.connThrottle().setDropIncrease(val);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public double getConnectionDropDecrease() {
+ return zks.connThrottle().getDropDecrease();
+ }
+
+ public void setConnectionDropDecrease(double val) {
+ zks.connThrottle().setDropDecrease(val);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+
+ public double getConnectionDecreaseRatio() {
+ return zks.connThrottle().getDecreasePoint();
+ }
+
+ public void setConnectionDecreaseRatio(double val) {
+ zks.connThrottle().setDecreasePoint(val);
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index bd4d349..91c8c82 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -98,6 +98,28 @@ public interface ZooKeeperServerMXBean {
public boolean getResponseCachingEnabled();
public void setResponseCachingEnabled(boolean isEnabled);
+ /* Connection throttling settings */
+ public int getConnectionMaxTokens();
+ public void setConnectionMaxTokens(int val);
+
+ public int getConnectionTokenFillTime();
+ public void setConnectionTokenFillTime(int val);
+
+ public int getConnectionTokenFillCount();
+ public void setConnectionTokenFillCount(int val);
+
+ public int getConnectionFreezeTime();
+ public void setConnectionFreezeTime(int val);
+
+ public double getConnectionDropIncrease();
+ public void setConnectionDropIncrease(double val);
+
+ public double getConnectionDropDecrease();
+ public void setConnectionDropDecrease(double val);
+
+ public double getConnectionDecreaseRatio();
+ public void setConnectionDecreaseRatio(double val);
+
/**
* Reset packet and latency statistics
*/
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index f1e5500..b0fa4ff 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -354,6 +354,7 @@ public class Commands {
OSMXBean osMbean = new OSMXBean();
response.put("open_file_descriptor_count",
osMbean.getOpenFileDescriptorCount());
response.put("max_file_descriptor_count",
osMbean.getMaxFileDescriptorCount());
+ response.put("connection_drop_probability",
zkServer.getConnectionDropChance());
response.put("last_client_response_size",
stats.getClientResponseStats().getLastBufferSize());
response.put("max_client_response_size",
stats.getClientResponseStats().getMaxBufferSize());
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
new file mode 100644
index 0000000..aa27a15
--- /dev/null
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/BlueThrottleTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class BlueThrottleTest extends ZKTestCase {
+ private static final Logger LOG =
LoggerFactory.getLogger(BlueThrottleTest.class);
+
+ class MockRandom extends Random {
+ int flag = 0;
+ BlueThrottle throttle;
+
+ @Override
+ public double nextDouble() {
+ if (throttle.getDropChance() > 0) {
+ flag = 1 - flag;
+ return flag;
+ } else {
+ return 1;
+ }
+ }
+ }
+
+ class BlueThrottleWithMockRandom extends BlueThrottle {
+ public BlueThrottleWithMockRandom(MockRandom random) {
+ super();
+ this.rng = random;
+ random.throttle = this;
+ }
+ }
+
+ @Test
+ public void testThrottleDisabled() {
+ BlueThrottle throttler = new BlueThrottle();
+ Assert.assertTrue("Throttle should be disabled by default",
throttler.checkLimit(1));
+ }
+
+ @Test
+ public void testThrottleWithoutRefill() {
+ BlueThrottle throttler = new BlueThrottle();
+ throttler.setMaxTokens(1);
+ throttler.setFillTime(2000);
+ Assert.assertTrue("First request should be allowed",
throttler.checkLimit(1));
+ Assert.assertFalse("Second request should be denied",
throttler.checkLimit(1));
+ }
+
+ @Test
+ public void testThrottleWithRefill() throws InterruptedException {
+ BlueThrottle throttler = new BlueThrottle();
+ throttler.setMaxTokens(1);
+ throttler.setFillTime(500);
+ Assert.assertTrue("First request should be allowed",
throttler.checkLimit(1));
+ Assert.assertFalse("Second request should be denied",
throttler.checkLimit(1));
+
+ //wait for the bucket to be refilled
+ Thread.sleep(750);
+ Assert.assertTrue("Third request should be allowed since we've got a
new token", throttler.checkLimit(1));
+ }
+
+ @Test
+ public void testThrottleWithoutRandomDropping() throws
InterruptedException {
+ int maxTokens = 5;
+ BlueThrottle throttler = new BlueThrottleWithMockRandom(new
MockRandom());
+ throttler.setMaxTokens(maxTokens);
+ throttler.setFillCount(maxTokens);
+ throttler.setFillTime(1000);
+
+ for (int i=0;i<maxTokens;i++) {
+ throttler.checkLimit(1);
+ }
+ Assert.assertEquals("All tokens should be used up by now",
throttler.getMaxTokens(), throttler.getDeficit());
+
+ Thread.sleep(110);
+ throttler.checkLimit(1);
+ Assert.assertFalse("Dropping probability should still be zero",
throttler.getDropChance()>0);
+
+ //allow bucket to be refilled
+ Thread.sleep(1500);
+
+ for (int i=0;i<maxTokens;i++) {
+ Assert.assertTrue("The first " + maxTokens + " requests should be
allowed", throttler.checkLimit(1));
+ }
+
+ for (int i=0;i<maxTokens;i++) {
+ Assert.assertFalse("The latter " + maxTokens + " requests should
be denied", throttler.checkLimit(1));
+ }
+ }
+
+ @Test
+ public void testThrottleWithRandomDropping() throws InterruptedException {
+ int maxTokens = 5;
+ BlueThrottle throttler = new BlueThrottleWithMockRandom(new
MockRandom());
+ throttler.setMaxTokens(maxTokens);
+ throttler.setFillCount(maxTokens);
+ throttler.setFillTime(1000);
+ throttler.setFreezeTime(100);
+ throttler.setDropIncrease(0.5);
+
+ for (int i=0;i<maxTokens;i++)
+ throttler.checkLimit(1);
+ Assert.assertEquals("All tokens should be used up by now",
throttler.getMaxTokens(), throttler.getDeficit());
+
+ Thread.sleep(120);
+ //this will trigger dropping probability being increased
+ throttler.checkLimit(1);
+ Assert.assertTrue("Dropping probability should be increased",
throttler.getDropChance()>0);
+ LOG.info("Dropping probability is {}", throttler.getDropChance());
+
+ //allow bucket to be refilled
+ Thread.sleep(1100);
+ LOG.info("Bucket is refilled with {} tokens.", maxTokens);
+
+ int accepted = 0;
+ for (int i=0;i<maxTokens;i++) {
+ if (throttler.checkLimit(1)) {
+ accepted ++;
+ }
+ }
+
+ LOG.info("Send {} requests, {} are accepted", maxTokens, accepted);
+ Assert.assertTrue("The dropping should be distributed",
accepted<maxTokens);
+
+ accepted = 0;
+
+ for (int i=0;i<maxTokens;i++) {
+ if (throttler.checkLimit(1)) {
+ accepted ++;
+ }
+ }
+
+ LOG.info("Send another {} requests, {} are accepted", maxTokens,
accepted);
+ Assert.assertTrue("Later requests should have a chance", accepted > 0);
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
index 9b30c55..000b3ce 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
@@ -190,7 +190,8 @@ public class CommandsTest extends ClientBase {
new Field("min_client_response_size", Integer.class),
new Field("uptime", Long.class),
new Field("global_sessions", Long.class),
- new Field("local_sessions", Long.class)
+ new Field("local_sessions", Long.class),
+ new Field("connection_drop_probability", Double.class)
));
for (String metric : ServerMetrics.getAllValues().keySet()) {
if (metric.startsWith("avg_")) {