This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new fe030bd  RATIS-714. Limit the byte size used by the message in pending 
requests.
fe030bd is described below

commit fe030bd3e97ef98c1b034d01a0c875065b073169
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Oct 28 15:21:55 2019 -0700

    RATIS-714. Limit the byte size used by the message in pending requests.
---
 .../java/org/apache/ratis/protocol/Message.java    |  11 +-
 .../org/apache/ratis/util/ResourceSemaphore.java   | 117 +++++++++++++++++++--
 .../apache/ratis/server/RaftServerConfigKeys.java  |  10 ++
 .../org/apache/ratis/server/impl/LeaderState.java  |   4 +-
 .../apache/ratis/server/impl/PendingRequests.java  |  53 +++++++---
 .../apache/ratis/server/impl/RaftServerImpl.java   |   2 +-
 .../ratis/server/impl/RaftServerMetrics.java       |   6 +-
 .../apache/ratis/util/TestResourceSemaphore.java   |  65 ++++++++++++
 8 files changed, 240 insertions(+), 28 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index 6ab04ae..1ed25eb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,6 +21,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.StringUtils;
 
+import java.util.Optional;
 import java.util.function.Supplier;
 
 /**
@@ -51,10 +52,18 @@ public interface Message {
     return valueOf(ByteString.copyFromUtf8(string), () -> "Message:" + string);
   }
 
+  static int getSize(Message message) {
+    return Optional.ofNullable(message).map(Message::size).orElse(0);
+  }
+
   Message EMPTY = valueOf(ByteString.EMPTY);
 
   /**
    * @return the content of the message
    */
   ByteString getContent();
+
+  default int size() {
+    return Optional.ofNullable(getContent()).map(ByteString::size).orElse(0);
+  }
 }
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java 
b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
index b76fd77..f16f5bd 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java
@@ -17,27 +17,62 @@
  */
 package org.apache.ratis.util;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * A {@link Semaphore} with an element limit for tracking resources.
+ * A {@link Semaphore} with a limit for a resource.
  *
  * After {@link #close()}, the resource becomes unavailable, i.e. any acquire 
will not succeed.
  */
 public class ResourceSemaphore extends Semaphore {
-  private final int elementLimit;
+  private final int limit;
+  private final AtomicBoolean reducePermits = new AtomicBoolean();
   private final AtomicBoolean isClosed = new AtomicBoolean();
 
-  public ResourceSemaphore(int elementLimit) {
-    super(elementLimit, true);
-    this.elementLimit = elementLimit;
+  public ResourceSemaphore(int limit) {
+    super(limit, true);
+    Preconditions.assertTrue(limit > 0, () -> "limit = " + limit + " <= 0");
+    this.limit = limit;
+  }
+
+  @Override
+  public void release() {
+    release(1);
+  }
+
+  @Override
+  public void release(int permits) {
+    assertRelease(permits);
+    super.release(permits);
+    assertAvailable();
+  }
+
+  private void assertRelease(int toRelease) {
+    Preconditions.assertTrue(toRelease >= 0, () -> "toRelease = " + toRelease 
+ " < 0");
+    final int available = assertAvailable();
+    final int permits = Math.addExact(available, toRelease);
+    Preconditions.assertTrue(permits <= limit, () -> "permits = " + permits + 
" > limit = " + limit);
+  }
+
+  private int assertAvailable() {
+    final int available = availablePermits();
+    Preconditions.assertTrue(available >= 0, () -> "available = " + available 
+ " < 0");
+    return available;
+  }
+
+  public int used() {
+    return limit - availablePermits();
   }
 
   /** Close the resource. */
   public void close() {
-    if (isClosed.compareAndSet(false, true)) {
-      reducePermits(elementLimit);
+    if (reducePermits.compareAndSet(false, true)) {
+      reducePermits(limit);
+      isClosed.set(true);
     }
   }
 
@@ -47,6 +82,72 @@ public class ResourceSemaphore extends Semaphore {
 
   @Override
   public String toString() {
-    return getClass().getSimpleName() + ":elementLimit=" + elementLimit + 
",closed?" + isClosed;
+    return (isClosed()? "closed/": availablePermits() + "/") + limit;
+  }
+
+  /**
+   * Track a group of resources with a list of {@link ResourceSemaphore}s.
+   */
+  public static class Group {
+    private final List<ResourceSemaphore> resources;
+
+    public Group(int... limits) {
+      Preconditions.assertTrue(limits.length > 1, () -> "limits.length = " + 
limits.length + " < 2");
+      final List<ResourceSemaphore> list = new ArrayList<>(limits.length);
+      for(int limit : limits) {
+        list.add(new ResourceSemaphore(limit));
+      }
+      this.resources = Collections.unmodifiableList(list);
+    }
+
+    public int resourceSize() {
+      return resources.size();
+    }
+
+    protected ResourceSemaphore get(int i) {
+      return resources.get(i);
+    }
+
+    protected boolean tryAcquire(int... permits) {
+      Preconditions.assertTrue(permits.length == resources.size(),
+          () -> "items.length = " + permits.length + " != resources.size() = " 
+ resources.size());
+      int i = 0;
+      // try acquiring all resources
+      for(; i < permits.length; i++) {
+        if (!resources.get(i).tryAcquire(permits[i])) {
+          break;
+        }
+      }
+      if (i == permits.length) {
+        return true; // successfully acquired all resources
+      }
+
+      // failed at i, releasing all previous resources
+      for(i--; i >= 0; i--) {
+        resources.get(i).release(permits[i]);
+      }
+      return false;
+    }
+
+    protected void release(int... permits) {
+      for(int i = resources.size() - 1; i >= 0; i--) {
+        resources.get(i).release(permits[i]);
+      }
+    }
+
+    public void close() {
+      for(int i = resources.size() - 1; i >= 0; i--) {
+        resources.get(i).close();
+      }
+    }
+
+    public boolean isClosed() {
+      return resources.get(resources.size() - 1).isClosed();
+    }
+
+    @Override
+    public String toString() {
+      return resources + ",size=" + resources.size();
+    }
   }
 }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index c5e9504..2839d7e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -88,6 +88,16 @@ public interface RaftServerConfigKeys {
     static void setElementLimit(RaftProperties properties, int limit) {
       setInt(properties::setInt, ELEMENT_LIMIT_KEY, limit, requireMin(1));
     }
+
+    String BYTE_LIMIT_KEY = PREFIX + ".byte-limit";
+    SizeInBytes BYTE_LIMIT_DEFAULT = SizeInBytes.valueOf("64MB");
+    static SizeInBytes byteLimit(RaftProperties properties) {
+      return getSizeInBytes(properties::getSizeInBytes,
+          BYTE_LIMIT_KEY, BYTE_LIMIT_DEFAULT, getDefaultLog());
+    }
+    static void setByteLimit(RaftProperties properties, int byteLimit) {
+      setInt(properties::setInt, BYTE_LIMIT_KEY, byteLimit, requireMin(1));
+    }
   }
 
   interface Watch {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 8f3e54b..b955352 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -306,8 +306,8 @@ public class LeaderState {
     return pending;
   }
 
-  PendingRequests.Permit tryAcquirePendingRequest() {
-    return pendingRequests.tryAcquire();
+  PendingRequests.Permit tryAcquirePendingRequest(Message message) {
+    return pendingRequests.tryAcquire(message);
   }
 
   PendingRequest addPendingRequest(PendingRequests.Permit permit, 
RaftClientRequest request, TransactionContext entry) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index b37b4f0..f9421f3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -28,8 +29,9 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.util.ResourceSemaphore;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ResourceSemaphore;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +50,29 @@ class PendingRequests {
 
   static class Permit {}
 
+  static class RequestLimits extends ResourceSemaphore.Group {
+    RequestLimits(int elementLimit, SizeInBytes byteLimit) {
+      super(elementLimit, byteLimit.getSizeInt());
+    }
+
+    int getElementCount() {
+      return get(0).used();
+    }
+
+    // TODO: add metrics
+    int getByteSize() {
+      return get(1).used();
+    }
+
+    boolean tryAcquire(Message message) {
+      return tryAcquire(1, Message.getSize(message));
+    }
+
+    void release(Message message) {
+      release(1, Message.getSize(message));
+    }
+  }
+
   private static class RequestMap {
     private final Object name;
     private final ConcurrentMap<Long, PendingRequest> map = new 
ConcurrentHashMap<>();
@@ -55,18 +80,19 @@ class PendingRequests {
 
     /** Permits to put new requests, always synchronized. */
     private final Map<Permit, Permit> permits = new HashMap<>();
-    /** Track and limit the number of requests. */
-    private final ResourceSemaphore resource;
+    /** Track and limit the number of requests and the total message size. */
+    private final RequestLimits resource;
 
-    RequestMap(Object name, int capacity, RaftServerMetrics raftServerMetrics) 
{
+    RequestMap(Object name, int elementLimit, SizeInBytes byteLimit, 
RaftServerMetrics raftServerMetrics) {
       this.name = name;
-      this.resource = new ResourceSemaphore(capacity);
+      this.resource = new RequestLimits(elementLimit, byteLimit);
       this.raftServerMetrics = raftServerMetrics;
-      raftServerMetrics.addNumPendingRequestsGauge(resource, capacity);
+
+      raftServerMetrics.addNumPendingRequestsGauge(resource::getElementCount);
     }
 
-    Permit tryAcquire() {
-      final boolean acquired = resource.tryAcquire();
+    Permit tryAcquire(Message message) {
+      final boolean acquired = resource.tryAcquire(message);
       LOG.trace("tryAcquire? {}", acquired);
       if (!acquired) {
         raftServerMetrics.onRequestQueueLimitHit();
@@ -108,7 +134,7 @@ class PendingRequests {
       if (r == null) {
         return null;
       }
-      resource.release();
+      resource.release(r.getRequest().getMessage());
       LOG.trace("release");
       return r;
     }
@@ -141,11 +167,14 @@ class PendingRequests {
 
   PendingRequests(RaftGroupMemberId id, RaftProperties properties, 
RaftServerMetrics raftServerMetrics) {
     this.name = id + "-" + getClass().getSimpleName();
-    this.pendingRequests = new RequestMap(id, 
RaftServerConfigKeys.Write.elementLimit(properties), raftServerMetrics);
+    this.pendingRequests = new RequestMap(id,
+        RaftServerConfigKeys.Write.elementLimit(properties),
+        RaftServerConfigKeys.Write.byteLimit(properties),
+        raftServerMetrics);
   }
 
-  Permit tryAcquire() {
-    return pendingRequests.tryAcquire();
+  Permit tryAcquire(Message message) {
+    return pendingRequests.tryAcquire(message);
   }
 
   PendingRequest add(Permit permit, RaftClientRequest request, 
TransactionContext entry) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 5434698..dc6a5a7 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -507,7 +507,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
       // append the message to its local log
       final LeaderState leaderState = role.getLeaderStateNonNull();
-      final PendingRequests.Permit permit = 
leaderState.tryAcquirePendingRequest();
+      final PendingRequests.Permit permit = 
leaderState.tryAcquirePendingRequest(request.getMessage());
       if (permit == null) {
         return JavaUtils.completeExceptionally(new 
ResourceUnavailableException(
             getMemberId() + ": Failed to acquire a pending write request for " 
+ request));
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
index 192c959..8834263 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
@@ -44,7 +44,6 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.metrics.RatisMetrics;
 import 
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.ResourceSemaphore;
 
 /**
  * Metric Registry for Raft Group Server. One instance per leader/follower.
@@ -171,8 +170,7 @@ public final class RaftServerMetrics {
     registry.counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER).inc();
   }
 
-  public void addNumPendingRequestsGauge(ResourceSemaphore resourceSemaphore, 
int capacity) {
-    registry.gauge(REQUEST_QUEUE_SIZE,
-        () -> () -> (capacity - resourceSemaphore.availablePermits()));
+  void addNumPendingRequestsGauge(Gauge queueSize) {
+    registry.gauge(REQUEST_QUEUE_SIZE, () -> queueSize);
   }
 }
\ No newline at end of file
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java 
b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
new file mode 100644
index 0000000..8b210ea
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.apache.ratis.BaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestResourceSemaphore extends BaseTest {
+  @Test(timeout = 1000)
+  public void testGroup() {
+    final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1);
+
+    assertUsed(g, 0, 0);
+    assertAcquire(g, true, 1, 1);
+    assertUsed(g, 1, 1);
+    assertAcquire(g, false, 1, 1);
+    assertUsed(g, 1, 1);
+    assertAcquire(g, false, 0, 1);
+    assertUsed(g, 1, 1);
+    assertAcquire(g, true, 1, 0);
+    assertUsed(g, 2, 1);
+    assertAcquire(g, true, 1, 0);
+    assertUsed(g, 3, 1);
+    assertAcquire(g, false, 1, 0);
+    assertUsed(g, 3, 1);
+
+    g.release(1, 1);
+    assertUsed(g, 2, 0);
+    g.release(2, 0);
+    assertUsed(g, 0, 0);
+    g.release(0, 0);
+    assertUsed(g, 0, 0);
+
+    testFailureCase("release over limit-0", () -> g.release(1, 0), 
IllegalStateException.class);
+    testFailureCase("release over limit-1", () -> g.release(0, 1), 
IllegalStateException.class);
+  }
+
+  static void assertUsed(ResourceSemaphore.Group g, int... expected) {
+    Assert.assertEquals(expected.length, g.resourceSize());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertEquals(expected[i], g.get(i).used());
+    }
+  }
+
+  static void assertAcquire(ResourceSemaphore.Group g, boolean expected, 
int... permits) {
+    final boolean computed = g.tryAcquire(permits);
+    Assert.assertEquals(expected, computed);
+  }
+}

Reply via email to