This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new fe030bd RATIS-714. Limit the byte size used by the message in pending
requests.
fe030bd is described below
commit fe030bd3e97ef98c1b034d01a0c875065b073169
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Oct 28 15:21:55 2019 -0700
RATIS-714. Limit the byte size used by the message in pending requests.
---
.../java/org/apache/ratis/protocol/Message.java | 11 +-
.../org/apache/ratis/util/ResourceSemaphore.java | 117 +++++++++++++++++++--
.../apache/ratis/server/RaftServerConfigKeys.java | 10 ++
.../org/apache/ratis/server/impl/LeaderState.java | 4 +-
.../apache/ratis/server/impl/PendingRequests.java | 53 +++++++---
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../ratis/server/impl/RaftServerMetrics.java | 6 +-
.../apache/ratis/util/TestResourceSemaphore.java | 65 ++++++++++++
8 files changed, 240 insertions(+), 28 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index 6ab04ae..1ed25eb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,6 +21,7 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.StringUtils;
+import java.util.Optional;
import java.util.function.Supplier;
/**
@@ -51,10 +52,18 @@ public interface Message {
return valueOf(ByteString.copyFromUtf8(string), () -> "Message:" + string);
}
+ static int getSize(Message message) {
+ return Optional.ofNullable(message).map(Message::size).orElse(0);
+ }
+
Message EMPTY = valueOf(ByteString.EMPTY);
/**
* @return the content of the message
*/
ByteString getContent();
+
+ default int size() {
+ return Optional.ofNullable(getContent()).map(ByteString::size).orElse(0);
+ }
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
index b76fd77..f16f5bd 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
@@ -17,27 +17,62 @@
*/
package org.apache.ratis.util;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
/**
- * A {@link Semaphore} with an element limit for tracking resources.
+ * A {@link Semaphore} with a limit for a resource.
*
* After {@link #close()}, the resource becomes unavailable, i.e. any acquire
will not succeed.
*/
public class ResourceSemaphore extends Semaphore {
- private final int elementLimit;
+ private final int limit;
+ private final AtomicBoolean reducePermits = new AtomicBoolean();
private final AtomicBoolean isClosed = new AtomicBoolean();
- public ResourceSemaphore(int elementLimit) {
- super(elementLimit, true);
- this.elementLimit = elementLimit;
+ public ResourceSemaphore(int limit) {
+ super(limit, true);
+ Preconditions.assertTrue(limit > 0, () -> "limit = " + limit + " <= 0");
+ this.limit = limit;
+ }
+
+ @Override
+ public void release() {
+ release(1);
+ }
+
+ @Override
+ public void release(int permits) {
+ assertRelease(permits);
+ super.release(permits);
+ assertAvailable();
+ }
+
+ private void assertRelease(int toRelease) {
+ Preconditions.assertTrue(toRelease >= 0, () -> "toRelease = " + toRelease
+ " < 0");
+ final int available = assertAvailable();
+ final int permits = Math.addExact(available, toRelease);
+ Preconditions.assertTrue(permits <= limit, () -> "permits = " + permits +
" > limit = " + limit);
+ }
+
+ private int assertAvailable() {
+ final int available = availablePermits();
+ Preconditions.assertTrue(available >= 0, () -> "available = " + available
+ " < 0");
+ return available;
+ }
+
+ public int used() {
+ return limit - availablePermits();
}
/** Close the resource. */
public void close() {
- if (isClosed.compareAndSet(false, true)) {
- reducePermits(elementLimit);
+ if (reducePermits.compareAndSet(false, true)) {
+ reducePermits(limit);
+ isClosed.set(true);
}
}
@@ -47,6 +82,72 @@ public class ResourceSemaphore extends Semaphore {
@Override
public String toString() {
- return getClass().getSimpleName() + ":elementLimit=" + elementLimit +
",closed?" + isClosed;
+ return (isClosed()? "closed/": availablePermits() + "/") + limit;
+ }
+
+ /**
+ * Track a group of resources with a list of {@link ResourceSemaphore}s.
+ */
+ public static class Group {
+ private final List<ResourceSemaphore> resources;
+
+ public Group(int... limits) {
+ Preconditions.assertTrue(limits.length > 1, () -> "limits.length = " +
limits.length + " < 2");
+ final List<ResourceSemaphore> list = new ArrayList<>(limits.length);
+ for(int limit : limits) {
+ list.add(new ResourceSemaphore(limit));
+ }
+ this.resources = Collections.unmodifiableList(list);
+ }
+
+ public int resourceSize() {
+ return resources.size();
+ }
+
+ protected ResourceSemaphore get(int i) {
+ return resources.get(i);
+ }
+
+ protected boolean tryAcquire(int... permits) {
+ Preconditions.assertTrue(permits.length == resources.size(),
+ () -> "items.length = " + permits.length + " != resources.size() = "
+ resources.size());
+ int i = 0;
+ // try acquiring all resources
+ for(; i < permits.length; i++) {
+ if (!resources.get(i).tryAcquire(permits[i])) {
+ break;
+ }
+ }
+ if (i == permits.length) {
+ return true; // successfully acquired all resources
+ }
+
+ // failed at i, releasing all previous resources
+ for(i--; i >= 0; i--) {
+ resources.get(i).release(permits[i]);
+ }
+ return false;
+ }
+
+ protected void release(int... permits) {
+ for(int i = resources.size() - 1; i >= 0; i--) {
+ resources.get(i).release(permits[i]);
+ }
+ }
+
+ public void close() {
+ for(int i = resources.size() - 1; i >= 0; i--) {
+ resources.get(i).close();
+ }
+ }
+
+ public boolean isClosed() {
+ return resources.get(resources.size() - 1).isClosed();
+ }
+
+ @Override
+ public String toString() {
+ return resources + ",size=" + resources.size();
+ }
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index c5e9504..2839d7e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -88,6 +88,16 @@ public interface RaftServerConfigKeys {
static void setElementLimit(RaftProperties properties, int limit) {
setInt(properties::setInt, ELEMENT_LIMIT_KEY, limit, requireMin(1));
}
+
+ String BYTE_LIMIT_KEY = PREFIX + ".byte-limit";
+ SizeInBytes BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("64MB");
+ static SizeInBytes byteLimit(RaftProperties properties) {
+ return getSizeInBytes(properties::getSizeInBytes,
+ BYTE_LIMIT_KEY, BYTE_LIMIT_DEFAULT, getDefaultLog());
+ }
+ static void setByteLimit(RaftProperties properties, int byteLimit) {
+ setInt(properties::setInt, BYTE_LIMIT_KEY, byteLimit, requireMin(1));
+ }
}
interface Watch {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 8f3e54b..b955352 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -306,8 +306,8 @@ public class LeaderState {
return pending;
}
- PendingRequests.Permit tryAcquirePendingRequest() {
- return pendingRequests.tryAcquire();
+ PendingRequests.Permit tryAcquirePendingRequest(Message message) {
+ return pendingRequests.tryAcquire(message);
}
PendingRequest addPendingRequest(PendingRequests.Permit permit,
RaftClientRequest request, TransactionContext entry) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index b37b4f0..f9421f3 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
@@ -28,8 +29,9 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.util.ResourceSemaphore;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ResourceSemaphore;
+import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +50,29 @@ class PendingRequests {
static class Permit {}
+ static class RequestLimits extends ResourceSemaphore.Group {
+ RequestLimits(int elementLimit, SizeInBytes byteLimit) {
+ super(elementLimit, byteLimit.getSizeInt());
+ }
+
+ int getElementCount() {
+ return get(0).used();
+ }
+
+ // TODO: add metrics
+ int getByteSize() {
+ return get(1).used();
+ }
+
+ boolean tryAcquire(Message message) {
+ return tryAcquire(1, Message.getSize(message));
+ }
+
+ void release(Message message) {
+ release(1, Message.getSize(message));
+ }
+ }
+
private static class RequestMap {
private final Object name;
private final ConcurrentMap<Long, PendingRequest> map = new
ConcurrentHashMap<>();
@@ -55,18 +80,19 @@ class PendingRequests {
/** Permits to put new requests, always synchronized. */
private final Map<Permit, Permit> permits = new HashMap<>();
- /** Track and limit the number of requests. */
- private final ResourceSemaphore resource;
+ /** Track and limit the number of requests and the total message size. */
+ private final RequestLimits resource;
- RequestMap(Object name, int capacity, RaftServerMetrics raftServerMetrics)
{
+ RequestMap(Object name, int elementLimit, SizeInBytes byteLimit,
RaftServerMetrics raftServerMetrics) {
this.name = name;
- this.resource = new ResourceSemaphore(capacity);
+ this.resource = new RequestLimits(elementLimit, byteLimit);
this.raftServerMetrics = raftServerMetrics;
- raftServerMetrics.addNumPendingRequestsGauge(resource, capacity);
+
+ raftServerMetrics.addNumPendingRequestsGauge(resource::getElementCount);
}
- Permit tryAcquire() {
- final boolean acquired = resource.tryAcquire();
+ Permit tryAcquire(Message message) {
+ final boolean acquired = resource.tryAcquire(message);
LOG.trace("tryAcquire? {}", acquired);
if (!acquired) {
raftServerMetrics.onRequestQueueLimitHit();
@@ -108,7 +134,7 @@ class PendingRequests {
if (r == null) {
return null;
}
- resource.release();
+ resource.release(r.getRequest().getMessage());
LOG.trace("release");
return r;
}
@@ -141,11 +167,14 @@ class PendingRequests {
PendingRequests(RaftGroupMemberId id, RaftProperties properties,
RaftServerMetrics raftServerMetrics) {
this.name = id + "-" + getClass().getSimpleName();
- this.pendingRequests = new RequestMap(id,
RaftServerConfigKeys.Write.elementLimit(properties), raftServerMetrics);
+ this.pendingRequests = new RequestMap(id,
+ RaftServerConfigKeys.Write.elementLimit(properties),
+ RaftServerConfigKeys.Write.byteLimit(properties),
+ raftServerMetrics);
}
- Permit tryAcquire() {
- return pendingRequests.tryAcquire();
+ Permit tryAcquire(Message message) {
+ return pendingRequests.tryAcquire(message);
}
PendingRequest add(Permit permit, RaftClientRequest request,
TransactionContext entry) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5434698..dc6a5a7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -507,7 +507,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
// append the message to its local log
final LeaderState leaderState = role.getLeaderStateNonNull();
- final PendingRequests.Permit permit =
leaderState.tryAcquirePendingRequest();
+ final PendingRequests.Permit permit =
leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
return JavaUtils.completeExceptionally(new
ResourceUnavailableException(
getMemberId() + ": Failed to acquire a pending write request for "
+ request));
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
index 192c959..8834263 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
@@ -44,7 +44,6 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.metrics.RatisMetrics;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ResourceSemaphore;
/**
* Metric Registry for Raft Group Server. One instance per leader/follower.
@@ -171,8 +170,7 @@ public final class RaftServerMetrics {
registry.counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER).inc();
}
- public void addNumPendingRequestsGauge(ResourceSemaphore resourceSemaphore,
int capacity) {
- registry.gauge(REQUEST_QUEUE_SIZE,
- () -> () -> (capacity - resourceSemaphore.availablePermits()));
+ void addNumPendingRequestsGauge(Gauge queueSize) {
+ registry.gauge(REQUEST_QUEUE_SIZE, () -> queueSize);
}
}
\ No newline at end of file
diff --git
a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
new file mode 100644
index 0000000..8b210ea
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.BaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestResourceSemaphore extends BaseTest {
+ @Test(timeout = 1000)
+ public void testGroup() {
+ final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
+
+ assertUsed(g, 0, 0);
+ assertAcquire(g, true, 1, 1);
+ assertUsed(g, 1, 1);
+ assertAcquire(g, false, 1, 1);
+ assertUsed(g, 1, 1);
+ assertAcquire(g, false, 0, 1);
+ assertUsed(g, 1, 1);
+ assertAcquire(g, true, 1, 0);
+ assertUsed(g, 2, 1);
+ assertAcquire(g, true, 1, 0);
+ assertUsed(g, 3, 1);
+ assertAcquire(g, false, 1, 0);
+ assertUsed(g, 3, 1);
+
+ g.release(1, 1);
+ assertUsed(g, 2, 0);
+ g.release(2, 0);
+ assertUsed(g, 0, 0);
+ g.release(0, 0);
+ assertUsed(g, 0, 0);
+
+ testFailureCase("release over limit-0", () -> g.release(1, 0),
IllegalStateException.class);
+ testFailureCase("release over limit-1", () -> g.release(0, 1),
IllegalStateException.class);
+ }
+
+ static void assertUsed(ResourceSemaphore.Group g, int... expected) {
+ Assert.assertEquals(expected.length, g.resourceSize());
+ for(int i = 0; i < expected.length; i++) {
+ Assert.assertEquals(expected[i], g.get(i).used());
+ }
+ }
+
+ static void assertAcquire(ResourceSemaphore.Group g, boolean expected,
int... permits) {
+ final boolean computed = g.tryAcquire(permits);
+ Assert.assertEquals(expected, computed);
+ }
+}