This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 0c2ecd615773cbb0bb6f3a475c93a79ea5982d7d Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Fri Aug 5 18:51:29 2022 -0700 RATIS-1603. TimeoutScheduler can have a huge amount of threads and cause OOM. (#666) (cherry picked from commit 34867f25afc1329d722253e94e008f7616b9eb39) --- .../ratis/client/impl/OrderedStreamAsync.java | 4 +- .../apache/ratis/client/impl/RaftClientImpl.java | 7 +- .../org/apache/ratis/util/TimeoutExecutor.java | 50 ++++++++++ .../org/apache/ratis/util/TimeoutScheduler.java | 22 +---- .../java/org/apache/ratis/util/TimeoutTimer.java | 109 +++++++++++++++++++++ .../grpc/client/GrpcClientProtocolClient.java | 5 +- .../apache/ratis/grpc/server/GrpcLogAppender.java | 2 +- .../ratis/netty/client/NettyClientStreamRpc.java | 4 +- .../apache/ratis/server/impl/PendingStepDown.java | 4 +- .../impl/SnapshotManagementRequestHandler.java | 4 +- .../ratis/server/impl/TransferLeadership.java | 4 +- .../apache/ratis/server/impl/WatchRequests.java | 2 +- .../apache/ratis/util/TestTimeoutScheduler.java | 4 +- 13 files changed, 183 insertions(+), 38 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java index ed4a20c03..8683a7f20 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java @@ -33,7 +33,7 @@ import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.SlidingWindow; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutScheduler; +import org.apache.ratis.util.TimeoutExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,7 +107,7 @@ public class OrderedStreamAsync { private final Semaphore requestSemaphore; private final TimeDuration requestTimeout; private final TimeDuration closeTimeout; - private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); OrderedStreamAsync(DataStreamClientRpc dataStreamClientRpc, RaftProperties properties){ this.dataStreamClientRpc = dataStreamClientRpc; diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 7313c9149..a333fe9a7 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -48,7 +48,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutScheduler; +import org.apache.ratis.util.TimeoutExecutor; import java.io.IOException; import java.util.ArrayList; @@ -135,7 +135,7 @@ public final class RaftClientImpl implements RaftClient { private volatile RaftPeerId leaderId; - private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); private final Supplier<OrderedAsync> orderedAsync; private final Supplier<AsyncImpl> asyncApi; @@ -227,7 +227,7 @@ public final class RaftClientImpl implements RaftClient { TimeDuration.ZERO : sleepDefault; } - TimeoutScheduler getScheduler() { + TimeoutExecutor getScheduler() { return scheduler; } @@ -404,7 +404,6 @@ public final class RaftClientImpl implements RaftClient { @Override public void close() throws IOException { - scheduler.close(); clientRpc.close(); if (dataStreamApi.isInitialized()) { dataStreamApi.get().close(); diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java new file mode 100644 index 000000000..aace13b88 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutExecutor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.util.function.CheckedRunnable; +import org.slf4j.Logger; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** Execute timeout tasks. */ +public interface TimeoutExecutor { + int MAXIMUM_POOL_SIZE = 8; + static TimeoutExecutor getInstance() { + return TimeoutTimer.getInstance(); + } + + /** @return the number of scheduled but not completed timeout tasks. */ + int getTaskCount(); + + /** + * Schedule a timeout task. + * + * @param timeout the timeout value. + * @param task the task to run when timeout. + * @param errorHandler to handle the error, if there is any. + */ + <THROWABLE extends Throwable> void onTimeout( + TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler); + + /** When timeout, run the task. Log the error, if there is any. */ + default void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) { + onTimeout(timeout, task, t -> log.error(errorMessage.get(), t)); + } +} diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java index 9c428f2f4..cba2851f4 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java @@ -21,7 +21,6 @@ import org.apache.ratis.util.function.CheckedRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.util.Collection; import java.util.Optional; import java.util.concurrent.ScheduledFuture; @@ -32,7 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; -public final class TimeoutScheduler implements Closeable { +public final class TimeoutScheduler implements TimeoutExecutor { public static final Logger LOG = LoggerFactory.getLogger(TimeoutScheduler.class); static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES); @@ -110,7 +109,8 @@ public final class TimeoutScheduler implements Closeable { private TimeoutScheduler() { } - int getQueueSize() { + @Override + public int getTaskCount() { return scheduler.getQueueSize(); } @@ -126,13 +126,7 @@ public final class TimeoutScheduler implements Closeable { return scheduler.hasExecutor(); } - /** - * Schedule a timeout task. - * - * @param timeout the timeout value. - * @param task the task to run when timeout. - * @param errorHandler to handle the error, if there is any. - */ + @Override public <THROWABLE extends Throwable> void onTimeout( TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) { onTimeout(timeout, sid -> { @@ -186,13 +180,7 @@ public final class TimeoutScheduler implements Closeable { } } - /** When timeout, run the task. Log the error, if there is any. */ - public void onTimeout(TimeDuration timeout, CheckedRunnable<?> task, Logger log, Supplier<String> errorMessage) { - onTimeout(timeout, task, t -> log.error(errorMessage.get(), t)); - } - - @Override - public synchronized void close() { + public synchronized void tryShutdownScheduler() { tryShutdownScheduler(scheduleID); } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java new file mode 100644 index 000000000..36cbb2b29 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutTimer.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.ratis.util.function.CheckedRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public final class TimeoutTimer implements TimeoutExecutor { + public static final Logger LOG = LoggerFactory.getLogger(TimeoutTimer.class); + + private static final Supplier<TimeoutTimer> INSTANCE = JavaUtils.memoize(() -> new TimeoutTimer(MAXIMUM_POOL_SIZE)); + + public static TimeoutTimer getInstance() { + return INSTANCE.get(); + } + + static class Task extends TimerTask { + private final int id; + private final Runnable runnable; + + Task(int id, Runnable runnable) { + this.id = id; + this.runnable = LogUtils.newRunnable(LOG, runnable, this::toString); + } + + @Override + public void run() { + LOG.debug("run {}", this); + runnable.run(); + } + + @Override + public String toString() { + return "task #" + id; + } + } + + /** The number of scheduled tasks. */ + private final AtomicInteger numTasks = new AtomicInteger(); + /** A unique ID for each task. */ + private final AtomicInteger taskId = new AtomicInteger(); + + private final List<MemoizedSupplier<Timer>> timers; + + private TimeoutTimer(int numTimers) { + final List<MemoizedSupplier<Timer>> list = new ArrayList<>(numTimers); + for(int i = 0; i < numTimers; i++) { + final String name = "timer" + i; + list.add(JavaUtils.memoize(() -> new Timer(name, true))); + } + this.timers = Collections.unmodifiableList(list); + } + + @Override + public int getTaskCount() { + return numTasks.get(); + } + + private Timer getTimer(int tid) { + return timers.get(Math.toIntExact(Integer.toUnsignedLong(tid) % timers.size())).get(); + } + + private void schedule(TimeDuration timeout, Runnable toSchedule) { + final int tid = taskId.incrementAndGet(); + final int n = numTasks.incrementAndGet(); + LOG.debug("schedule a task #{} with timeout {}, numTasks={}", tid, timeout, n); + getTimer(n).schedule(new Task(tid, toSchedule), timeout.toLong(TimeUnit.MILLISECONDS)); + } + + @Override + public <THROWABLE extends Throwable> void onTimeout( + TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) { + schedule(timeout, () -> { + try { + task.run(); + } catch(Throwable t) { + errorHandler.accept(JavaUtils.cast(t)); + } finally { + numTasks.decrementAndGet(); + } + }); + } +} diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java index c2ee1f247..d8b128a43 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java @@ -58,7 +58,7 @@ import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutScheduler; +import org.apache.ratis.util.TimeoutExecutor; import org.apache.ratis.util.function.CheckedSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +86,7 @@ public class GrpcClientProtocolClient implements Closeable { private final TimeDuration requestTimeoutDuration; private final TimeDuration watchRequestTimeoutDuration; - private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); private final RaftClientProtocolServiceStub asyncStub; private final AdminProtocolServiceBlockingStub adminBlockingStub; @@ -173,7 +173,6 @@ public class GrpcClientProtocolClient implements Closeable { if (clientChannel != adminChannel) { GrpcUtil.shutdownManagedChannel(adminChannel); } - scheduler.close(); metricClientInterceptor.close(); } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 69a3795ca..3f01ea299 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -63,7 +63,7 @@ public class GrpcLogAppender extends LogAppenderBase { private final boolean installSnapshotEnabled; private final TimeDuration requestTimeoutDuration; - private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); private volatile StreamObservers appendLogRequestObserver; private final boolean useSeparateHBChannel; diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index 37f09b8da..a03013748 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -52,7 +52,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.NetUtils; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutScheduler; +import org.apache.ratis.util.TimeoutExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,7 +243,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>(); private final TimeDuration replyQueueGracePeriod; - private final TimeoutScheduler timeoutScheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance(); public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) { this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java index 5b928a69e..b7bfde3f6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingStepDown.java @@ -25,7 +25,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutScheduler; +import org.apache.ratis.util.TimeoutExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +81,7 @@ public class PendingStepDown { } private final LeaderStateImpl leader; - private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); private final PendingRequestReference pending = new PendingRequestReference(); PendingStepDown(LeaderStateImpl leaderState) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java index 416e501ad..b899752e0 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotManagementRequestHandler.java @@ -23,7 +23,7 @@ import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutScheduler; +import org.apache.ratis.util.TimeoutExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +89,7 @@ class SnapshotManagementRequestHandler { } private final RaftServerImpl server; - private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); private final PendingRequestReference pending = new PendingRequestReference(); SnapshotManagementRequestHandler(RaftServerImpl server) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java index 03b1dfc80..3aed1a10d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java @@ -24,7 +24,7 @@ import org.apache.ratis.protocol.exceptions.TransferLeadershipException; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.TimeDuration; -import org.apache.ratis.util.TimeoutScheduler; +import org.apache.ratis.util.TimeoutExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +74,7 @@ public class TransferLeadership { } private final RaftServerImpl server; - private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); private final AtomicReference<PendingRequest> pending = new AtomicReference<>(); TransferLeadership(RaftServerImpl server) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java index 3b95d4bf1..f4c6200b9 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WatchRequests.java @@ -169,7 +169,7 @@ class WatchRequests { private final TimeDuration watchTimeoutNanos; private final TimeDuration watchTimeoutDenominationNanos; - private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); WatchRequests(Object name, RaftProperties properties) { this.name = name + "-" + JavaUtils.getClassSimpleName(getClass()); diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java index dbe0e943f..cca1cfdea 100644 --- a/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java +++ b/ratis-test/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java @@ -223,7 +223,7 @@ public class TestTimeoutScheduler extends BaseTest { } HUNDRED_MILLIS.sleep(); HUNDRED_MILLIS.sleep(); - JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()), + JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getTaskCount()), 10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG); final TimeDuration oneMillis = TimeDuration.valueOf(1, TimeUnit.MILLISECONDS); @@ -234,7 +234,7 @@ public class TestTimeoutScheduler extends BaseTest { oneMillis.sleep(); } HUNDRED_MILLIS.sleep(); - JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getQueueSize()), + JavaUtils.attempt(() -> Assert.assertEquals(1, scheduler.getTaskCount()), 10, HUNDRED_MILLIS, "only 1 shutdown task is scheduled", LOG); errorHandler.assertNoError();
