Repository: incubator-ratis Updated Branches: refs/heads/master c692bf201 -> e37ab2ee1
RATIS-224. In RpcTimeout, it should shut down the scheduler if there is no tasks. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/e37ab2ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/e37ab2ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/e37ab2ee Branch: refs/heads/master Commit: e37ab2ee164fab84172513a72aa7175826b8dfd0 Parents: c692bf2 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Wed Apr 11 20:34:18 2018 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Wed Apr 11 20:34:18 2018 +0800 ---------------------------------------------------------------------- .../java/org/apache/ratis/rpc/RpcTimeout.java | 70 ------- .../org/apache/ratis/util/TimeDuration.java | 4 + .../org/apache/ratis/util/TimeoutScheduler.java | 123 +++++++++++ .../grpc/client/RaftClientProtocolClient.java | 13 +- .../ratis/grpc/server/GRpcLogAppender.java | 14 +- .../apache/ratis/util/TestTimeoutScheduler.java | 210 +++++++++++++++++++ 6 files changed, 344 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java b/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java deleted file mode 100644 index 03d7eab..0000000 --- a/ratis-common/src/main/java/org/apache/ratis/rpc/RpcTimeout.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.ratis.rpc; - -import org.apache.ratis.shaded.com.google.common.base.Supplier; -import org.apache.ratis.util.Preconditions; -import org.apache.ratis.util.TimeDuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class RpcTimeout { - private static final Logger LOG = LoggerFactory.getLogger(RpcTimeout.class); - private ScheduledExecutorService timeoutScheduler = null; - private final TimeDuration callTimeout; - private int numUsers = 0; - - public RpcTimeout(TimeDuration callTimeout) { - this.callTimeout = callTimeout; - } - - public synchronized void addUser() { - if (timeoutScheduler == null) { - timeoutScheduler = Executors.newScheduledThreadPool(1); - } - numUsers++; - } - - public synchronized void removeUser() { - numUsers--; - if (timeoutScheduler != null && numUsers == 0) { - timeoutScheduler.shutdown(); - timeoutScheduler = null; - } - } - - public synchronized void onTimeout(Runnable task, Supplier<String> errorMsg) { - Preconditions.assertTrue(timeoutScheduler != null); - TimeUnit unit = callTimeout.getUnit(); - timeoutScheduler.schedule(() -> { - try { - task.run(); - } catch (Throwable t) { - LOG.error(errorMsg.get(), t); - } - }, callTimeout.toInt(unit), unit); - } - - public TimeDuration getCallTimeout() { - return callTimeout; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java index 8a3dc18..8a7c44a 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java @@ -103,6 +103,10 @@ public class TimeDuration implements Comparable<TimeDuration> { this.unit = Objects.requireNonNull(unit, "unit = null"); } + public long getDuration() { + return duration; + } + public TimeUnit getUnit() { return unit; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java new file mode 100644 index 0000000..7a7d16c --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeoutScheduler.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public final class TimeoutScheduler { + public static final Logger LOG = LoggerFactory.getLogger(TimeoutScheduler.class); + + private static final TimeDuration DEFAULT_GRACE_PERIOD = TimeDuration.valueOf(1, TimeUnit.MINUTES); + + private static final Supplier<TimeoutScheduler> INSTANCE = JavaUtils.memoize(TimeoutScheduler::new); + + public static TimeoutScheduler getInstance() { + return INSTANCE.get(); + } + + private TimeoutScheduler() {} + + /** When there is no tasks, the time period to wait before shutting down the scheduler. */ + private final AtomicReference<TimeDuration> gracePeriod = new AtomicReference<>(DEFAULT_GRACE_PERIOD); + + /** The number of scheduled tasks. */ + private int numTasks = 0; + /** The scheduleID for each task */ + private int scheduleID = 0; + private ScheduledExecutorService scheduler = null; + + TimeDuration getGracePeriod() { + return gracePeriod.get(); + } + + void setGracePeriod(TimeDuration gracePeriod) { + this.gracePeriod.set(gracePeriod); + } + + synchronized boolean hasScheduler() { + return scheduler != null; + } + + /** + * Schedule a timeout task. + * + * @param timeout the timeout value. + * @param task the task to run when timeout. + * @param errorHandler to handle the error, if there is any. + */ + public <THROWABLE extends Throwable> void onTimeout( + TimeDuration timeout, CheckedRunnable<THROWABLE> task, Consumer<THROWABLE> errorHandler) { + onTimeout(timeout, sid -> { + LOG.debug("run a task: sid {}", sid); + try { + task.run(); + } catch(Throwable t) { + errorHandler.accept((THROWABLE) t); + } finally { + onTaskCompleted(); + } + }); + } + + private synchronized void onTimeout(TimeDuration timeout, Consumer<Integer> toSchedule) { + if (scheduler == null) { + Preconditions.assertTrue(numTasks == 0); + LOG.debug("Initialize scheduler"); + scheduler = Executors.newScheduledThreadPool(1); + } + numTasks++; + final int sid = scheduleID++; + + LOG.debug("schedule a task: timeout {}, sid {}", timeout, sid); + scheduler.schedule(() -> toSchedule.accept(sid), timeout.getDuration(), timeout.getUnit()); + } + + private synchronized void onTaskCompleted() { + if (--numTasks == 0) { + final int sid = scheduleID; + final TimeDuration grace = getGracePeriod(); + LOG.debug("Schedule a shutdown task: grace {}, sid {}", grace, sid); + scheduler.schedule(() -> tryShutdownScheduler(sid), grace.getDuration(), grace.getUnit()); + } + } + + private synchronized void tryShutdownScheduler(int sid) { + if (sid == scheduleID) { + // No new tasks submitted, shutdown the scheduler. + LOG.debug("shutdown scheduler: sid {}", sid); + scheduler.shutdown(); + scheduler = null; + } else { + LOG.debug("shutdown cancelled: scheduleID has changed from {} to {}", sid, scheduleID); + } + } + + /** When timeout, run the task. Log the error, if there is any. */ + public static <THROWABLE extends Throwable> void onTimeout( + TimeDuration timeout, CheckedRunnable<THROWABLE> task, Logger log, Supplier<String> errorMessage) { + getInstance().onTimeout(timeout, task, t -> log.error(errorMessage.get(), t)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java index e54cbbd..e90dad4 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java @@ -20,7 +20,7 @@ package org.apache.ratis.grpc.client; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.grpc.RaftGrpcUtil; import org.apache.ratis.protocol.*; -import org.apache.ratis.rpc.RpcTimeout; +import org.apache.ratis.util.TimeoutScheduler; import org.apache.ratis.shaded.io.grpc.ManagedChannel; import org.apache.ratis.shaded.io.grpc.StatusRuntimeException; import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; @@ -139,7 +139,6 @@ public class RaftClientProtocolClient implements Closeable { class AsyncStreamObservers implements Closeable { /** Request map: callId -> future */ private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>()); - private final RpcTimeout rpcTimeout = new RpcTimeout(timeout); private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() { @Override public void onNext(RaftClientReplyProto proto) { @@ -170,10 +169,6 @@ public class RaftClientProtocolClient implements Closeable { }; private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver); - private AsyncStreamObservers() { - rpcTimeout.addUser(); - } - CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) { final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get(); if (map == null) { @@ -184,7 +179,8 @@ public class RaftClientProtocolClient implements Closeable { () -> getName() + ":" + getClass().getSimpleName()); try { requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request)); - rpcTimeout.onTimeout(() -> timeoutCheck(request), () -> "Timeout check failed for client request: " + request); + TimeoutScheduler.onTimeout(timeout, () -> timeoutCheck(request), LOG, + () -> "Timeout check failed for client request: " + request); } catch(Throwable t) { handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t)); } @@ -193,7 +189,7 @@ public class RaftClientProtocolClient implements Closeable { private void timeoutCheck(RaftClientRequest request) { handleReplyFuture(request.getCallId(), f -> f.completeExceptionally( - new IOException("Request timeout " + rpcTimeout.getCallTimeout() + ": " + request))); + new IOException("Request timeout " + timeout + ": " + request))); } private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) { @@ -206,7 +202,6 @@ public class RaftClientProtocolClient implements Closeable { public void close() { requestStreamObserver.onCompleted(); completeReplyExceptionally(null, "close"); - rpcTimeout.removeUser(); } private void completeReplyExceptionally(Throwable t, String event) { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index eb2db91..64d0b23 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -20,7 +20,7 @@ package org.apache.ratis.grpc.server; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.RaftGRpcService; import org.apache.ratis.grpc.RaftGrpcUtil; -import org.apache.ratis.rpc.RpcTimeout; +import org.apache.ratis.util.TimeoutScheduler; import org.apache.ratis.server.impl.FollowerInfo; import org.apache.ratis.server.impl.LeaderState; import org.apache.ratis.server.impl.LogAppender; @@ -58,8 +58,7 @@ public class GRpcLogAppender extends LogAppender { private final AppendLogResponseHandler appendResponseHandler; private final InstallSnapshotResponseHandler snapshotResponseHandler; - private static RpcTimeout rpcTimeout = new RpcTimeout( - TimeDuration.valueOf(3, TimeUnit.SECONDS)); + private static TimeDuration rpcTimeout = TimeDuration.valueOf(3, TimeUnit.SECONDS); private volatile StreamObserver<AppendEntriesRequestProto> appendLogRequestObserver; private StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver; @@ -76,7 +75,6 @@ public class GRpcLogAppender extends LogAppender { appendResponseHandler = new AppendLogResponseHandler(); snapshotResponseHandler = new InstallSnapshotResponseHandler(); - rpcTimeout.addUser(); } @Override @@ -162,7 +160,7 @@ public class GRpcLogAppender extends LogAppender { server.getId(), null, request); s.onNext(request); - rpcTimeout.onTimeout(() -> timeoutAppendRequest(request), + TimeoutScheduler.onTimeout(rpcTimeout, () -> timeoutAppendRequest(request), LOG, () -> "Timeout check failed for append entry request: " + request); follower.updateLastRpcSendTime(); } @@ -328,12 +326,6 @@ public class GRpcLogAppender extends LogAppender { } } - @Override - public LogAppender stopSender() { - rpcTimeout.removeUser(); - return super.stopSender(); - } - private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> { private final Queue<Integer> pending; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/e37ab2ee/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java b/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java new file mode 100644 index 0000000..7c4ef4f --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/util/TestTimeoutScheduler.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +public class TestTimeoutScheduler { + { + LogUtils.setLogLevel(TimeoutScheduler.LOG, Level.ALL); + } + + static class ErrorHandler implements Consumer<RuntimeException> { + private final AtomicBoolean hasError = new AtomicBoolean(false); + + @Override + public void accept(RuntimeException e) { + hasError.set(true); + TimeoutScheduler.LOG.error("Failed", e); + } + + void assertNoError() { + Assert.assertFalse(hasError.get()); + } + } + + @Test(timeout = 1000) + public void testSingleTask() throws Exception { + final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + scheduler.setGracePeriod(grace); + Assert.assertFalse(scheduler.hasScheduler()); + + final ErrorHandler errorHandler = new ErrorHandler(); + + final AtomicBoolean fired = new AtomicBoolean(false); + scheduler.onTimeout(TimeDuration.valueOf(250, TimeUnit.MILLISECONDS), () -> { + Assert.assertFalse(fired.get()); + fired.set(true); + }, errorHandler); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertFalse(fired.get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertFalse(fired.get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired.get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired.get()); + Assert.assertFalse(scheduler.hasScheduler()); + + errorHandler.assertNoError(); + } + + @Test(timeout = 1000) + public void testMultipleTasks() throws Exception { + final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + scheduler.setGracePeriod(grace); + Assert.assertFalse(scheduler.hasScheduler()); + + final ErrorHandler errorHandler = new ErrorHandler(); + + final AtomicBoolean[] fired = new AtomicBoolean[3]; + for(int i = 0; i < fired.length; i++) { + final AtomicBoolean f = fired[i] = new AtomicBoolean(false); + scheduler.onTimeout(TimeDuration.valueOf(100*i + 50, TimeUnit.MILLISECONDS), () -> { + Assert.assertFalse(f.get()); + f.set(true); + }, errorHandler); + Assert.assertTrue(scheduler.hasScheduler()); + } + + Thread.sleep(100); + Assert.assertTrue(fired[0].get()); + Assert.assertFalse(fired[1].get()); + Assert.assertFalse(fired[2].get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired[0].get()); + Assert.assertTrue(fired[1].get()); + Assert.assertFalse(fired[2].get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired[0].get()); + Assert.assertTrue(fired[1].get()); + Assert.assertTrue(fired[2].get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired[0].get()); + Assert.assertTrue(fired[1].get()); + Assert.assertTrue(fired[2].get()); + Assert.assertFalse(scheduler.hasScheduler()); + + errorHandler.assertNoError(); + } + + @Test(timeout = 1000) + public void testExtendingGracePeriod() throws Exception { + final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + scheduler.setGracePeriod(grace); + Assert.assertFalse(scheduler.hasScheduler()); + + final ErrorHandler errorHandler = new ErrorHandler(); + + { + final AtomicBoolean fired = new AtomicBoolean(false); + scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> { + Assert.assertFalse(fired.get()); + fired.set(true); + }, errorHandler); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertFalse(fired.get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired.get()); + Assert.assertTrue(scheduler.hasScheduler()); + } + + { + // submit another task during grace period + final AtomicBoolean fired2 = new AtomicBoolean(false); + scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> { + Assert.assertFalse(fired2.get()); + fired2.set(true); + }, errorHandler); + + Thread.sleep(100); + Assert.assertFalse(fired2.get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired2.get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired2.get()); + Assert.assertFalse(scheduler.hasScheduler()); + } + + errorHandler.assertNoError(); + } + + @Test(timeout = 1000) + public void testRestartingScheduler() throws Exception { + final TimeoutScheduler scheduler = TimeoutScheduler.getInstance(); + final TimeDuration grace = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS); + scheduler.setGracePeriod(grace); + Assert.assertFalse(scheduler.hasScheduler()); + + final ErrorHandler errorHandler = new ErrorHandler(); + + for(int i = 0; i < 2; i++) { + final AtomicBoolean fired = new AtomicBoolean(false); + scheduler.onTimeout(TimeDuration.valueOf(150, TimeUnit.MILLISECONDS), () -> { + Assert.assertFalse(fired.get()); + fired.set(true); + }, errorHandler); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertFalse(fired.get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired.get()); + Assert.assertTrue(scheduler.hasScheduler()); + + Thread.sleep(100); + Assert.assertTrue(fired.get()); + Assert.assertFalse(scheduler.hasScheduler()); + } + + errorHandler.assertNoError(); + } +}