This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2_readIndex in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 5a996a42f4901fa9155b76dd639e84213224098a Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Tue Mar 14 18:52:16 2023 +0800 RATIS-1812. Enforce a outstanding request limit in StreamObserverWithTimeout. (#850) (cherry picked from commit 917671cf35cec5357f078158873d98a13d29157c) --- .../org/apache/ratis/util/ResourceSemaphore.java | 28 +++++----------- .../apache/ratis/grpc/server/GrpcLogAppender.java | 4 +-- .../grpc/server/GrpcServerProtocolClient.java | 4 +-- .../ratis/grpc/util/StreamObserverWithTimeout.java | 37 +++++++++++++++++++--- .../apache/ratis/server/impl/PendingRequests.java | 17 +++++++--- .../server/leader/InstallSnapshotRequests.java | 2 +- .../org/apache/ratis/grpc/util/GrpcTestClient.java | 2 +- .../grpc/util/TestStreamObserverWithTimeout.java | 3 -- .../apache/ratis/util/TestResourceSemaphore.java | 13 ++++---- 9 files changed, 64 insertions(+), 46 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java index b9e0ff5c7..fb75feeaa 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ResourceSemaphore.java @@ -88,14 +88,9 @@ public class ResourceSemaphore extends Semaphore { /** * Track a group of resources with a list of {@link ResourceSemaphore}s. */ - - public enum ResourceAcquireStatus { - SUCCESS, - FAILED_IN_ELEMENT_LIMIT, - FAILED_IN_BYTE_SIZE_LIMIT - } - public static class Group { + public static final int SUCCESS = -1; + private final List<ResourceSemaphore> resources; public Group(int... limits) { @@ -115,7 +110,8 @@ public class ResourceSemaphore extends Semaphore { return resources.get(i); } - public ResourceAcquireStatus tryAcquire(int... permits) { + /** @return {@link #SUCCESS} if successfully acquired; otherwise, return the failed index. */ + public int tryAcquire(int... permits) { Preconditions.assertTrue(permits.length == resources.size(), () -> "items.length = " + permits.length + " != resources.size() = " + resources.size()); int i = 0; @@ -126,24 +122,16 @@ public class ResourceSemaphore extends Semaphore { } } - if (i == permits.length) { - return ResourceAcquireStatus.SUCCESS; // successfully acquired all resources - } - - ResourceAcquireStatus acquireStatus; - if (i == 0) { - acquireStatus = ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT; - } else { - acquireStatus = ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT; + return SUCCESS; // successfully acquired all resources } // failed at i, releasing all previous resources - for(i--; i >= 0; i--) { - resources.get(i).release(permits[i]); + for(int k = i - 1; k >= 0; k--) { + resources.get(k).release(permits[k]); } - return acquireStatus; + return i; } public void acquire(int... permits) throws InterruptedException { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 0f975bf62..c688b66cd 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -598,7 +598,7 @@ public class GrpcLogAppender extends LogAppenderBase { final String requestId = UUID.randomUUID().toString(); try { snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-installSnapshot", - requestTimeoutDuration, responseHandler); + requestTimeoutDuration, 8, responseHandler); //FIXME: RATIS-1809 for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) { if (isRunning()) { snapshotRequestObserver.onNext(request); @@ -649,7 +649,7 @@ public class GrpcLogAppender extends LogAppenderBase { } try { snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot", - requestTimeoutDuration, responseHandler); + requestTimeoutDuration, 0, responseHandler); snapshotRequestObserver.onNext(request); getFollower().updateLastRpcSendTime(false); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java index c3f8730e7..d1bb70728 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java @@ -137,8 +137,8 @@ public class GrpcServerProtocolClient implements Closeable { } StreamObserver<InstallSnapshotRequestProto> installSnapshot( - String name, TimeDuration timeout, StreamObserver<InstallSnapshotReplyProto> responseHandler) { - return StreamObserverWithTimeout.newInstance(name, timeout, + String name, TimeDuration timeout, int limit, StreamObserver<InstallSnapshotReplyProto> responseHandler) { + return StreamObserverWithTimeout.newInstance(name, timeout, limit, i -> asyncStub.withInterceptors(i).installSnapshot(responseHandler)); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java index 2b875f3ed..8fa30b1cc 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/util/StreamObserverWithTimeout.java @@ -21,6 +21,7 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.util.JavaUtils; +import org.apache.ratis.util.ResourceSemaphore; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; import org.slf4j.Logger; @@ -34,13 +35,19 @@ import java.util.function.IntSupplier; public final class StreamObserverWithTimeout<T> implements StreamObserver<T> { public static final Logger LOG = LoggerFactory.getLogger(StreamObserverWithTimeout.class); - public static <T> StreamObserverWithTimeout<T> newInstance(String name, TimeDuration timeout, + public static <T> StreamObserverWithTimeout<T> newInstance( + String name, TimeDuration timeout, int outstandingLimit, Function<ClientInterceptor, StreamObserver<T>> newStreamObserver) { final AtomicInteger responseCount = new AtomicInteger(); - final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor( - r -> responseCount.getAndIncrement()); + final ResourceSemaphore semaphore = outstandingLimit > 0? new ResourceSemaphore(outstandingLimit): null; + final ResponseNotifyClientInterceptor interceptor = new ResponseNotifyClientInterceptor(r -> { + responseCount.getAndIncrement(); + if (semaphore != null) { + semaphore.release(); + } + }); return new StreamObserverWithTimeout<>( - name, timeout, responseCount::get, newStreamObserver.apply(interceptor)); + name, timeout, responseCount::get, semaphore, newStreamObserver.apply(interceptor)); } private final String name; @@ -51,17 +58,37 @@ public final class StreamObserverWithTimeout<T> implements StreamObserver<T> { private final AtomicBoolean isClose = new AtomicBoolean(); private final AtomicInteger requestCount = new AtomicInteger(); private final IntSupplier responseCount; + private final ResourceSemaphore semaphore; private StreamObserverWithTimeout(String name, TimeDuration timeout, IntSupplier responseCount, - StreamObserver<T> observer) { + ResourceSemaphore semaphore, StreamObserver<T> observer) { this.name = JavaUtils.getClassSimpleName(getClass()) + "-" + name; this.timeout = timeout; this.responseCount = responseCount; + this.semaphore = semaphore; this.observer = observer; } + private void acquire(T request) { + if (semaphore == null) { + return; + } + boolean acquired = false; + for (; !acquired && !isClose.get(); ) { + try { + acquired = semaphore.tryAcquire(timeout.getDuration(), timeout.getUnit()); + } catch (InterruptedException e) { + throw new IllegalStateException("Interrupted onNext " + request, e); + } + } + if (!acquired) { + throw new IllegalStateException("Failed onNext " + request + ": already closed."); + } + } + @Override public void onNext(T request) { + acquire(request); observer.onNext(request); final int id = requestCount.incrementAndGet(); scheduler.onTimeout(timeout, () -> handleTimeout(id, request), diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java index 0812c29fd..2f2ca8f7d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java @@ -61,6 +61,12 @@ class PendingRequests { static class Permit {} + /** + * The return type of {@link RequestLimits#tryAcquire(int)}. + * The order of the enum value must match the order in {@link RequestLimits}. + */ + enum Acquired { FAILED_IN_ELEMENT_LIMIT, FAILED_IN_BYTE_SIZE_LIMIT, SUCCESS } + static class RequestLimits extends ResourceSemaphore.Group { RequestLimits(int elementLimit, int megabyteLimit) { super(elementLimit, megabyteLimit); @@ -74,8 +80,9 @@ class PendingRequests { return get(1).used(); } - ResourceSemaphore.ResourceAcquireStatus tryAcquire(int messageSizeMb) { - return tryAcquire(1, messageSizeMb); + Acquired tryAcquire(int messageSizeMb) { + final int acquired = tryAcquire(1, messageSizeMb); + return acquired == SUCCESS? PendingRequests.Acquired.SUCCESS: PendingRequests.Acquired.values()[acquired]; } void releaseExtraMb(int extraMb) { @@ -112,13 +119,13 @@ class PendingRequests { Permit tryAcquire(Message message) { final int messageSize = Message.getSize(message); final int messageSizeMb = roundUpMb(messageSize ); - final ResourceSemaphore.ResourceAcquireStatus acquired = resource.tryAcquire(messageSizeMb); + final Acquired acquired = resource.tryAcquire(messageSizeMb); LOG.trace("tryAcquire {} MB? {}", messageSizeMb, acquired); - if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT) { + if (acquired == Acquired.FAILED_IN_ELEMENT_LIMIT) { raftServerMetrics.onRequestQueueLimitHit(); raftServerMetrics.onResourceLimitHit(); return null; - } else if (acquired == ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT) { + } else if (acquired == Acquired.FAILED_IN_BYTE_SIZE_LIMIT) { raftServerMetrics.onRequestByteSizeLimitHit(); raftServerMetrics.onResourceLimitHit(); return null; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java index f52253b24..cdb6603c2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java @@ -113,7 +113,7 @@ class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> { private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() { final int numFiles = snapshot.getFiles().size(); if (fileIndex >= numFiles) { - throw new NoSuchElementException(); + throw new NoSuchElementException("fileIndex = " + fileIndex + " >= numFiles = " + numFiles); } final FileInfo info = snapshot.getFiles().get(fileIndex); try { diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java index 0923b27fe..7434b2d79 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/GrpcTestClient.java @@ -53,7 +53,7 @@ class GrpcTestClient implements Closeable { } static StreamObserverFactory withTimeout(TimeDuration timeout) { - return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, + return (stub, responseHandler) -> StreamObserverWithTimeout.newInstance("test", timeout, 2, i -> stub.withInterceptors(i).hello(responseHandler)); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java index dac58812d..1439f9b9d 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/util/TestStreamObserverWithTimeout.java @@ -92,9 +92,6 @@ public class TestStreamObserverWithTimeout extends BaseTest { final List<CompletableFuture<String>> futures = new ArrayList<>(); for (String m : messages) { - if (type == Type.WithTimeout) { - timeout.sleep(); - } futures.add(client.send(m)); } diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java index d085161cf..6fe1aed7e 100644 --- a/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java +++ b/ratis-test/src/test/java/org/apache/ratis/util/TestResourceSemaphore.java @@ -24,17 +24,17 @@ import org.junit.Test; import java.util.concurrent.TimeoutException; -import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_BYTE_SIZE_LIMIT; -import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.FAILED_IN_ELEMENT_LIMIT; -import static org.apache.ratis.util.ResourceSemaphore.ResourceAcquireStatus.SUCCESS; +import static org.apache.ratis.util.ResourceSemaphore.Group.SUCCESS; public class TestResourceSemaphore extends BaseTest { @Test(timeout = 5000) public void testGroup() throws InterruptedException, TimeoutException { + final int FAILED_IN_ELEMENT_LIMIT = 0; + final int FAILED_IN_BYTE_SIZE_LIMIT = 1; final ResourceSemaphore.Group g = new ResourceSemaphore.Group(3, 1); assertUsed(g, 0, 0); - assertAcquire(g, ResourceSemaphore.ResourceAcquireStatus.SUCCESS, 1, 1); + assertAcquire(g, SUCCESS, 1, 1); assertUsed(g, 1, 1); assertAcquire(g, FAILED_IN_BYTE_SIZE_LIMIT, 1, 1); assertUsed(g, 1, 1); @@ -86,9 +86,8 @@ public class TestResourceSemaphore extends BaseTest { } } - static void assertAcquire(ResourceSemaphore.Group g, ResourceSemaphore.ResourceAcquireStatus expected, - int... permits) { - final ResourceSemaphore.ResourceAcquireStatus computed = g.tryAcquire(permits); + static void assertAcquire(ResourceSemaphore.Group g, int expected, int... permits) { + final int computed = g.tryAcquire(permits); Assert.assertEquals(expected, computed); }
