This is an automated email from the ASF dual-hosted git repository. williamsong pushed a commit to branch snapshot-3 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 1691a3c0f700868e009f5c23393589c6e0d8dac1 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Tue Jan 23 10:01:01 2024 -0800 RATIS-2012. Client should not retry after close. (#1025) --- .../org/apache/ratis/client/impl/BlockingImpl.java | 12 +- .../org/apache/ratis/client/impl/OrderedAsync.java | 101 ++++------ .../apache/ratis/client/impl/RaftClientImpl.java | 42 ++-- .../apache/ratis/client/impl/UnorderedAsync.java | 14 +- .../ratis/client/retry/ClientRetryEvent.java | 24 +-- .../java/org/apache/ratis/util/PeerProxyMap.java | 17 +- .../test/java/org/apache/ratis/RaftAsyncTests.java | 2 +- .../test/java/org/apache/ratis/RaftBasicTests.java | 14 +- .../test/java/org/apache/ratis/RaftTestUtil.java | 30 ++- .../org/apache/ratis/RaftLogTruncateTests.java | 216 +++++++++++++++++++++ .../ratis/grpc/TestRaftLogTruncateWithGrpc.java | 24 +++ .../org/apache/ratis/grpc/TestRaftWithGrpc.java | 6 - .../ratis/retry/TestExceptionDependentRetry.java | 2 +- .../org/apache/ratis/retry/TestRetryPolicy.java | 20 +- .../apache/ratis/server/ServerRestartTests.java | 2 +- 15 files changed, 380 insertions(+), 146 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java index 4be9fa327..76987801b 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java @@ -119,16 +119,18 @@ class BlockingImpl implements BlockingApi { ioe = e; } - pending.incrementExceptionCount(ioe); - ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending); + if (client.isClosed()) { + throw new AlreadyClosedException(this + " is closed."); + } + + final ClientRetryEvent event = pending.newClientRetryEvent(request, ioe); final RetryPolicy retryPolicy = client.getRetryPolicy(); final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event); - TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, action.getSleepTime()); - if (!action.shouldRetry()) { - throw (IOException)client.noMoreRetries(event); + throw client.noMoreRetries(event); } + final TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, action.getSleepTime()); try { sleepTime.sleep(); } catch (InterruptedException e) { diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java index a1aa58681..34dc3be11 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java @@ -149,10 +149,6 @@ public final class OrderedAsync { getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t); } - private void handleAsyncRetryFailure(ClientRetryEvent event) { - failAllAsyncRequests(event.getRequest(), client.noMoreRetries(event)); - } - CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message message, RaftPeerId server) { if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.MESSAGESTREAM)) { Objects.requireNonNull(message, "message == null"); @@ -187,85 +183,68 @@ public final class OrderedAsync { if (pending == null) { return; } - - final CompletableFuture<RaftClientReply> f = pending.getReplyFuture(); - if (f.isDone()) { + if (pending.getReplyFuture().isDone()) { return; } - final RaftClientRequest request = pending.newRequestImpl(); + final RaftClientRequest request = pending.newRequest(); if (request == null) { // already done - LOG.debug("{} newRequestImpl returns null", pending); + LOG.debug("{} newRequest returns null", pending); return; } - final RetryPolicy retryPolicy = client.getRetryPolicy(); - sendRequest(pending).exceptionally(e -> { - if (e instanceof CompletionException) { - e = JavaUtils.unwrapCompletionException(e); - scheduleWithTimeout(pending, request, retryPolicy, e); - return null; - } - f.completeExceptionally(e); - return null; - }); - } - - private void scheduleWithTimeout(PendingOrderedRequest pending, - RaftClientRequest request, RetryPolicy retryPolicy, Throwable e) { - final int attempt = pending.getAttemptCount(); - final ClientRetryEvent event = new ClientRetryEvent(request, e, pending); - final TimeDuration sleepTime = client.getEffectiveSleepTime(e, - retryPolicy.handleAttemptFailure(event).getSleepTime()); - LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", attempt, sleepTime, retryPolicy, request); - scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request)); - } - - private void scheduleWithTimeout(PendingOrderedRequest pending, TimeDuration sleepTime, - SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> slidingWindow) { - client.getScheduler().onTimeout(sleepTime, - () -> slidingWindow.retry(pending, this::sendRequestWithRetry), - LOG, () -> "Failed* to retry " + pending); - } - - private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest pending) { - final RetryPolicy retryPolicy = client.getRetryPolicy(); - final RaftClientRequest request; if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) { pending.setFirstRequest(); } - request = pending.newRequest(); LOG.debug("{}: send* {}", client.getId(), request); - return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> { + client.getClientRpc().sendRequestAsync(request).thenAccept(reply -> { LOG.debug("{}: receive* {}", client.getId(), reply); Objects.requireNonNull(reply, "reply == null"); client.handleReply(request, reply); getSlidingWindow(request).receiveReply( request.getSlidingWindowEntry().getSeqNum(), reply, this::sendRequestWithRetry); - return reply; }).exceptionally(e -> { LOG.error(client.getId() + ": Failed* " + request, e); - e = JavaUtils.unwrapCompletionException(e); - if (e instanceof IOException && !(e instanceof GroupMismatchException)) { - pending.incrementExceptionCount(e); - final ClientRetryEvent event = new ClientRetryEvent(request, e, pending); - if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) { - handleAsyncRetryFailure(event); - } else { - if (e instanceof NotLeaderException) { - NotLeaderException nle = (NotLeaderException)e; - client.handleNotLeaderException(request, nle, this::resetSlidingWindow); - } else { - client.handleIOException(request, (IOException) e, null, this::resetSlidingWindow); - } - } - throw new CompletionException(e); - } - failAllAsyncRequests(request, e); + handleException(pending, request, e); return null; }); } + private void handleException(PendingOrderedRequest pending, RaftClientRequest request, Throwable e) { + final RetryPolicy retryPolicy = client.getRetryPolicy(); + if (client.isClosed()) { + failAllAsyncRequests(request, new AlreadyClosedException(client + " is closed.")); + return; + } + + e = JavaUtils.unwrapCompletionException(e); + if (!(e instanceof IOException) || e instanceof GroupMismatchException) { + // non-retryable exceptions + failAllAsyncRequests(request, e); + return; + } + + final ClientRetryEvent event = pending.newClientRetryEvent(request, e); + final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event); + if (!action.shouldRetry()) { + failAllAsyncRequests(request, client.noMoreRetries(event)); + return; + } + + if (e instanceof NotLeaderException) { + client.handleNotLeaderException(request, (NotLeaderException) e, this::resetSlidingWindow); + } else { + client.handleIOException(request, (IOException) e, null, this::resetSlidingWindow); + } + final TimeDuration sleepTime = client.getEffectiveSleepTime(e, action.getSleepTime()); + LOG.debug("schedule* retry with sleep {} for attempt #{} of {}, {}", + sleepTime, event.getAttemptCount(), request, retryPolicy); + final SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> slidingWindow = getSlidingWindow(request); + client.getScheduler().onTimeout(sleepTime, + () -> slidingWindow.retry(pending, this::sendRequestWithRetry), + LOG, () -> "Failed* to retry " + pending); + } + void assertRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) { Preconditions.assertSame(expectedAvailablePermits, requestSemaphore.availablePermits(), "availablePermits"); Preconditions.assertSame(expectedQueueLength, requestSemaphore.getQueueLength(), "queueLength"); diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index f42391947..1b82709da 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -44,11 +44,13 @@ import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.thirdparty.com.google.common.cache.Cache; import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder; import org.apache.ratis.util.CollectionUtils; +import org.apache.ratis.util.IOUtils; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeoutExecutor; +import org.apache.ratis.util.Timestamp; import java.io.IOException; import java.util.ArrayList; @@ -65,6 +67,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -79,10 +82,10 @@ public final class RaftClientImpl implements RaftClient { .build(); public abstract static class PendingClientRequest { - private final long creationTimeInMs = System.currentTimeMillis(); + private final Timestamp creationTime = Timestamp.currentTime(); private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>(); private final AtomicInteger attemptCount = new AtomicInteger(); - private final Map<Class<?>, Integer> exceptionCount = new ConcurrentHashMap<>(); + private final Map<Class<?>, Integer> exceptionCounts = new ConcurrentHashMap<>(); public abstract RaftClientRequest newRequestImpl(); @@ -101,19 +104,10 @@ public final class RaftClientImpl implements RaftClient { return attemptCount.get(); } - int incrementExceptionCount(Throwable t) { - return t != null ? exceptionCount.compute(t.getClass(), (k, v) -> v != null ? v + 1 : 1) : 0; - } - - public int getExceptionCount(Throwable t) { - return t != null ? Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0) : 0; - } - - public boolean isRequestTimeout(TimeDuration timeout) { - if (timeout == null) { - return false; - } - return System.currentTimeMillis() - creationTimeInMs > timeout.toLong(TimeUnit.MILLISECONDS); + public ClientRetryEvent newClientRetryEvent(RaftClientRequest request, Throwable throwable) { + final int exceptionCount = throwable == null? 0 + : exceptionCounts.compute(throwable.getClass(), (k, v) -> v == null? 1: v+1); + return new ClientRetryEvent(getAttemptCount(), request, exceptionCount, throwable, creationTime); } } @@ -196,6 +190,8 @@ public final class RaftClientImpl implements RaftClient { private final ConcurrentMap<RaftPeerId, LeaderElectionManagementApi> leaderElectionManagement = new ConcurrentHashMap<>(); + private final AtomicBoolean closed = new AtomicBoolean(); + @SuppressWarnings("checkstyle:ParameterNumber") RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftPeer primaryDataStreamServer, RaftClientRpc clientRpc, RetryPolicy retryPolicy, RaftProperties properties, Parameters parameters) { @@ -346,11 +342,11 @@ public final class RaftClientImpl implements RaftClient { return dataStreamApi.get(); } - Throwable noMoreRetries(ClientRetryEvent event) { + IOException noMoreRetries(ClientRetryEvent event) { final int attemptCount = event.getAttemptCount(); final Throwable throwable = event.getCause(); if (attemptCount == 1 && throwable != null) { - return throwable; + return IOUtils.asIOException(throwable); } return new RaftRetryFailureException(event.getRequest(), attemptCount, retryPolicy, throwable); } @@ -418,8 +414,7 @@ public final class RaftClientImpl implements RaftClient { void handleIOException(RaftClientRequest request, IOException ioe, RaftPeerId newLeader, Consumer<RaftClientRequest> handler) { - LOG.debug("{}: suggested new leader: {}. Failed {} with {}", - clientId, newLeader, request, ioe); + LOG.debug("{}: suggested new leader: {}. Failed {}", clientId, newLeader, request, ioe); if (LOG.isTraceEnabled()) { LOG.trace("Stack trace", new Throwable("TRACE")); } @@ -456,8 +451,17 @@ public final class RaftClientImpl implements RaftClient { return clientRpc; } + boolean isClosed() { + return closed.get(); + } + @Override public void close() throws IOException { + if (!closed.compareAndSet(false, true)) { + return; + } + + LOG.debug("close {}", getId()); clientRpc.close(); if (dataStreamApi.isInitialized()) { dataStreamApi.get().close(); diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java index 84b817b58..eccda4dbd 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java @@ -22,6 +22,7 @@ import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.exceptions.AlreadyClosedException; import org.apache.ratis.protocol.exceptions.GroupMismatchException; import org.apache.ratis.protocol.exceptions.NotLeaderException; import org.apache.ratis.protocol.RaftClientReply; @@ -89,11 +90,14 @@ public interface UnorderedAsync { } final Throwable cause = replyException != null ? replyException : e; - pending.incrementExceptionCount(cause); - final ClientRetryEvent event = new ClientRetryEvent(request, cause, pending); + if (client.isClosed()) { + f.completeExceptionally(new AlreadyClosedException(client + " is closed")); + return; + } + + final ClientRetryEvent event = pending.newClientRetryEvent(request, cause); RetryPolicy retryPolicy = client.getRetryPolicy(); final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event); - TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime()); if (!action.shouldRetry()) { f.completeExceptionally(client.noMoreRetries(event)); return; @@ -124,7 +128,9 @@ public interface UnorderedAsync { } } - LOG.debug("schedule retry for attempt #{}, policy={}, request={}", attemptCount, retryPolicy, request); + final TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime()); + LOG.debug("schedule~ attempt #{} with sleep {} and policy {} for {}", + attemptCount, sleepTime, retryPolicy, request); client.getScheduler().onTimeout(sleepTime, () -> sendRequestWithRetry(pending, client), LOG, () -> clientId + ": Failed~ to retry " + request); } catch (Exception ex) { diff --git a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java index f0c38efb9..c6a8beb06 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java @@ -17,12 +17,11 @@ */ package org.apache.ratis.client.retry; -import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.Timestamp; /** An {@link RetryPolicy.Event} specific to client request failure. */ public class ClientRetryEvent implements RetryPolicy.Event { @@ -30,23 +29,15 @@ public class ClientRetryEvent implements RetryPolicy.Event { private final int causeCount; private final RaftClientRequest request; private final Throwable cause; - private PendingClientRequest pending; + private final Timestamp pendingRequestCreationTime; - @VisibleForTesting - public ClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) { - this(attemptCount, request, attemptCount, cause); - } - - public ClientRetryEvent(RaftClientRequest request, Throwable t, PendingClientRequest pending) { - this(pending.getAttemptCount(), request, pending.getExceptionCount(t), t); - this.pending = pending; - } - - private ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause) { + public ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause, + Timestamp pendingRequestCreationTime) { this.attemptCount = attemptCount; this.causeCount = causeCount; this.request = request; this.cause = cause; + this.pendingRequestCreationTime = pendingRequestCreationTime; } @Override @@ -69,7 +60,7 @@ public class ClientRetryEvent implements RetryPolicy.Event { } boolean isRequestTimeout(TimeDuration timeout) { - return pending != null && pending.isRequestTimeout(timeout); + return timeout != null && pendingRequestCreationTime.elapsedTime().compareTo(timeout) >= 0; } @Override @@ -77,6 +68,7 @@ public class ClientRetryEvent implements RetryPolicy.Event { return JavaUtils.getClassSimpleName(getClass()) + ":attempt=" + attemptCount + ",request=" + request - + ",cause=" + cause; + + ",cause=" + cause + + ",causeCount=" + causeCount; } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java index 105ecbfb4..0ce0595fa 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; /** A map from peer id to peer and its proxy. */ public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Closeable { @@ -65,7 +66,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Clos throw new AlreadyClosedException(name + " is already " + current); } lifeCycle.startAndTransition( - () -> proxy = createProxy.apply(peer), IOException.class); + () -> proxy = createProxyImpl(peer), IOException.class); } } } @@ -92,6 +93,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Clos private final Object resetLock = new Object(); private final CheckedFunction<RaftPeer, PROXY, IOException> createProxy; + private final AtomicBoolean closed = new AtomicBoolean(); public PeerProxyMap(String name, CheckedFunction<RaftPeer, PROXY, IOException> createProxy) { this.name = name; @@ -102,6 +104,13 @@ public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Clos return name; } + private PROXY createProxyImpl(RaftPeer peer) throws IOException { + if (closed.get()) { + throw new AlreadyClosedException(name + ": Failed to create proxy for " + peer); + } + return createProxy.apply(peer); + } + public PROXY getProxy(RaftPeerId id) throws IOException { Objects.requireNonNull(id, "id == null"); PeerAndProxy p = peers.get(id); @@ -161,6 +170,10 @@ public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Clos @Override public void close() { + if (!closed.compareAndSet(false, true)) { + return; + } + final List<IOException> exceptions = Collections.synchronizedList(new ArrayList<>()); ConcurrentUtils.parallelForEachAsync(peers.values(), pp -> pp.setNullProxyAndClose().map(proxy -> closeProxy(proxy, pp)).ifPresent(exceptions::add), @@ -180,7 +193,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Clos private IOException closeProxy(PROXY proxy, PeerAndProxy pp) { try { - LOG.debug("{}: Closing proxy for peer {}", name, pp); + LOG.debug("{}: Closing proxy {} {} for peer {}", name, proxy.getClass().getSimpleName(), proxy, pp); proxy.close(); return null; } catch (IOException e) { diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java index 260f6013e..71c5c5ef0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java @@ -372,7 +372,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba @Test public void testStateMachineMetrics() throws Exception { runWithNewCluster(NUM_SERVERS, cluster -> - RaftBasicTests.testStateMachineMetrics(true, cluster, LOG)); + RaftBasicTests.runTestStateMachineMetrics(true, cluster)); } @Test diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java index 47c9b0e08..4ff9681f0 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java @@ -168,12 +168,10 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> final List<RaftServer.Division> divisions = cluster.getServerAliveStream().collect(Collectors.toList()); for(RaftServer.Division impl: divisions) { - JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages), - 50, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " assertLogEntries", LOG); + RaftTestUtil.assertLogEntries(impl, term, messages, 50, LOG); } } - @Test public void testOldLeaderCommit() throws Exception { runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderCommit); @@ -218,7 +216,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> cluster.getServerAliveStream() .map(RaftServer.Division::getRaftLog) - .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages)); + .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages, System.out::println)); } @Test @@ -453,8 +451,12 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster> } } - public static void testStateMachineMetrics(boolean async, - MiniRaftCluster cluster, Logger LOG) throws Exception { + @Test + public void testStateMachineMetrics() throws Exception { + runWithNewCluster(NUM_SERVERS, cluster -> runTestStateMachineMetrics(false, cluster)); + } + + static void runTestStateMachineMetrics(boolean async, MiniRaftCluster cluster) throws Exception { RaftServer.Division leader = waitForLeader(cluster); try (final RaftClient client = cluster.createClient()) { Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader, diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index 1d45c6821..21af91776 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -248,19 +248,16 @@ public interface RaftTestUtil { } } - static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage... expectedMessages) { - LOG.info("checking raft log for {}", server.getMemberId()); - final RaftLog log = server.getRaftLog(); - try { - RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages); - } catch (AssertionError e) { - LOG.error("Unexpected raft log in {}", server.getMemberId(), e); - throw e; - } + static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage[] expectedMessages, + int numAttempts, Logger log) throws Exception { + final String name = server.getId() + " assertLogEntries"; + final Function<Integer, Consumer<String>> print = i -> i < numAttempts? s -> {}: System.out::println; + JavaUtils.attempt(i -> assertLogEntries(server.getRaftLog(), expectedTerm, expectedMessages, print.apply(i)), + numAttempts, TimeDuration.ONE_SECOND, () -> name, log); } static Iterable<LogEntryProto> getLogEntryProtos(RaftLog log) { - return CollectionUtils.as(log.getEntries(0, Long.MAX_VALUE), ti -> { + return CollectionUtils.as(log.getEntries(0, log.getLastEntryTermIndex().getIndex() + 1), ti -> { try { return log.get(ti.getIndex()); } catch (IOException exception) { @@ -269,17 +266,17 @@ public interface RaftTestUtil { }); } - static List<LogEntryProto> getStateMachineLogEntries(RaftLog log) { + static List<LogEntryProto> getStateMachineLogEntries(RaftLog log, Consumer<String> print) { final List<LogEntryProto> entries = new ArrayList<>(); for (LogEntryProto e : getLogEntryProtos(log)) { final String s = LogProtoUtils.toLogEntryString(e); if (e.hasStateMachineLogEntry()) { - LOG.info(s + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", ")); + print.accept(entries.size() + ") " + s); entries.add(e); } else if (e.hasConfigurationEntry()) { - LOG.info("Found {}, ignoring it.", s); + print.accept("Ignoring " + s); } else if (e.hasMetadataEntry()) { - LOG.info("Found {}, ignoring it.", s); + print.accept("Ignoring " + s); } else { throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + s); } @@ -287,13 +284,14 @@ public interface RaftTestUtil { return entries; } - static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) { - final List<LogEntryProto> entries = getStateMachineLogEntries(log); + static Void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage[] expectedMessages, Consumer<String> print) { + final List<LogEntryProto> entries = getStateMachineLogEntries(log, print); try { assertLogEntries(entries, expectedTerm, expectedMessages); } catch(Exception t) { throw new AssertionError("entries: " + entries, t); } + return null; } static void assertLogEntries(List<LogEntryProto> entries, long expectedTerm, SimpleMessage... expectedMessages) { diff --git a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java new file mode 100644 index 000000000..80c57741c --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.ratis.RaftTestUtil.SimpleMessage; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.RaftClientConfigKeys; +import org.apache.ratis.client.impl.OrderedAsync; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.MiniRaftCluster; +import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; +import org.apache.ratis.util.Slf4jUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.event.Level; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.ratis.RaftTestUtil.waitForLeader; + +public abstract class RaftLogTruncateTests<CLUSTER extends MiniRaftCluster> extends BaseTest + implements MiniRaftCluster.Factory.Get<CLUSTER> { + public static final int NUM_SERVERS = 5; + final TimeDuration MIN_TIMEOUT = TimeDuration.valueOf(3, TimeUnit.SECONDS); + + static SimpleMessage[] arraycopy(SimpleMessage[] src1, SimpleMessage[] src2) { + final SimpleMessage[] dst = new SimpleMessage[src1.length + src2.length]; + System.arraycopy(src1, 0, dst, 0, src1.length); + System.arraycopy(src2, 0, dst, src1.length, src2.length); + return dst; + } + + { + Slf4jUtils.setLogLevel(OrderedAsync.LOG, Level.ERROR); + Slf4jUtils.setLogLevel(RaftServerConfigKeys.LOG, Level.ERROR); + Slf4jUtils.setLogLevel(RaftClientConfigKeys.LOG, Level.ERROR); + + final RaftProperties p = getProperties(); + p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class); + + // set a long rpc timeout so, when the leader does not have the majority, it won't step down fast. + RaftServerConfigKeys.Rpc.setTimeoutMin(p, MIN_TIMEOUT); + RaftServerConfigKeys.Rpc.setTimeoutMax(p, MIN_TIMEOUT.multiply(2)); + RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(p, TimeDuration.ONE_SECOND); + RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(p, TimeDuration.ONE_SECOND.multiply(2)); + } + + @Override + public int getGlobalTimeoutSeconds() { + return 200; + } + + @Test + public void testLogTruncate() throws Exception { + runWithNewCluster(NUM_SERVERS, this::runTestLogTruncate); + } + + void runTestLogTruncate(MiniRaftCluster cluster) throws Exception { + final RaftServer.Division oldLeader = waitForLeader(cluster); + final List<RaftServer.Division> oldFollowers = cluster.getFollowers(); + final List<RaftPeerId> killedPeers = new ArrayList<>(); + final List<RaftPeerId> remainingPeers = new ArrayList<>(); + + final int majorityIndex = NUM_SERVERS / 2 + 1; + Assert.assertEquals(NUM_SERVERS - 1, oldFollowers.size()); + Assert.assertTrue(majorityIndex < oldFollowers.size()); + + for (int i = 0; i < majorityIndex; i++) { + killedPeers.add(oldFollowers.get(i).getId()); + } + remainingPeers.add(oldLeader.getId()); + for (int i = majorityIndex; i < oldFollowers.size(); i++) { + remainingPeers.add(oldFollowers.get(i).getId()); + } + + try { + runTestLogTruncate(cluster, oldLeader, killedPeers, remainingPeers); + } catch (Throwable e) { + LOG.info("killedPeers : {}", killedPeers); + LOG.info("remainingPeers: {}", remainingPeers); + throw e; + } + } + + void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader, + List<RaftPeerId> killedPeers, List<RaftPeerId> remainingPeers) throws Exception { + final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>()); + final long oldLeaderTerm = oldLeader.getInfo().getCurrentTerm(); + LOG.info("oldLeader: {}, term={}", oldLeader.getId(), oldLeaderTerm); + + final SimpleMessage[] firstBatch = SimpleMessage.create(5, "first"); + final SimpleMessage[] secondBatch = SimpleMessage.create(4, "second"); + + try (final RaftClient client = cluster.createClient(oldLeader.getId())) { + // send some messages + for (SimpleMessage batch : firstBatch) { + final RaftClientReply reply = client.io().send(batch); + Assert.assertTrue(reply.isSuccess()); + } + for (RaftServer.Division f : cluster.getFollowers()) { + assertLogEntries(f, oldLeaderTerm, firstBatch); + } + + // kill a majority of followers + LOG.info("Before killServer {}: {}", killedPeers, cluster.printServers()); + for (RaftPeerId f : killedPeers) { + cluster.killServer(f); + } + LOG.info("After killServer {}: {}", killedPeers, cluster.printServers()); + + // send more messages, but they won't be committed due to not enough followers + final SimpleMessage[] messagesToBeTruncated = SimpleMessage.create(3, "messagesToBeTruncated"); + final AtomicBoolean done = new AtomicBoolean(); + for (SimpleMessage message : messagesToBeTruncated) { + client.async().send(message).whenComplete((r, e) -> { + if (!done.get()) { + exceptions.add(new IllegalStateException(message + " is completed: reply=" + r, e)); + } + }); + } + + // check log messages + final SimpleMessage[] expectedMessages = arraycopy(firstBatch, messagesToBeTruncated); + for (RaftPeerId f : remainingPeers) { + assertLogEntries(cluster.getDivision(f), oldLeaderTerm, expectedMessages); + } + done.set(true); + LOG.info("done"); + } + + // kill the remaining servers + LOG.info("Before killServer {}: {}", remainingPeers, cluster.printServers()); + for (RaftPeerId f : remainingPeers) { + cluster.killServer(f); + } + LOG.info("After killServer {}: {}", remainingPeers, cluster.printServers()); + + // restart the earlier followers + for (RaftPeerId f : killedPeers) { + cluster.restartServer(f, false); + } + + // The new leader should be one of the earlier followers + final RaftServer.Division newLeader = waitForLeader(cluster); + LOG.info("After restartServer {}: {}", killedPeers, cluster.printServers()); + final long newLeaderTerm = newLeader.getInfo().getCurrentTerm(); + + final SegmentedRaftLog newLeaderLog = (SegmentedRaftLog) newLeader.getRaftLog(); + LOG.info("newLeader: {}, term {}, last={}", newLeader.getId(), newLeaderTerm, + newLeaderLog.getLastEntryTermIndex()); + Assert.assertTrue(killedPeers.contains(newLeader.getId())); + + // restart the remaining servers + for (RaftPeerId f : remainingPeers) { + cluster.restartServer(f, false); + } + + // check RaftLog truncate + for (RaftPeerId f : remainingPeers) { + assertLogEntries(cluster.getDivision(f), oldLeaderTerm, firstBatch); + } + + try (final RaftClient client = cluster.createClient(newLeader.getId())) { + // send more messages + for (SimpleMessage batch : secondBatch) { + final RaftClientReply reply = client.io().send(batch); + Assert.assertTrue(reply.isSuccess()); + } + } + + // check log messages -- it should be truncated and then append the new messages + final SimpleMessage[] expectedMessages = arraycopy(firstBatch, secondBatch); + for (RaftPeerId f : killedPeers) { + assertLogEntries(cluster.getDivision(f), oldLeaderTerm, expectedMessages); + } + + if (!exceptions.isEmpty()) { + LOG.info("{} exceptions", exceptions.size()); + for(int i = 0 ; i < exceptions.size(); i++) { + LOG.info("exception {})", i, exceptions.get(i)); + } + Assert.fail(); + } + } + + private void assertLogEntries(RaftServer.Division server, long term, SimpleMessage[] expectedMessages) + throws Exception { + RaftTestUtil.assertLogEntries(server, term, expectedMessages, 30, LOG); + } +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java new file mode 100644 index 000000000..dc2846374 --- /dev/null +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.grpc; + +import org.apache.ratis.RaftLogTruncateTests; + +public class TestRaftLogTruncateWithGrpc extends RaftLogTruncateTests<MiniRaftClusterWithGrpc> + implements MiniRaftClusterWithGrpc.FactoryGet { +} diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java index bc0061f5f..046453d58 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java @@ -73,12 +73,6 @@ public class TestRaftWithGrpc runWithNewCluster(NUM_SERVERS, cluster -> testRequestTimeout(false, cluster, LOG)); } - @Test - public void testStateMachineMetrics() throws Exception { - runWithNewCluster(NUM_SERVERS, cluster -> - testStateMachineMetrics(false, cluster, LOG)); - } - @Test public void testUpdateViaHeartbeat() throws Exception { runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat); diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java index 264db8946..36e6dfbcc 100644 --- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java +++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java @@ -154,7 +154,7 @@ public class TestExceptionDependentRetry extends BaseTest implements MiniRaftClu long sleepTime) { for (int i = 0; i < retries + 1; i++) { RetryPolicy.Action action = exceptionDependentRetry - .handleAttemptFailure(new ClientRetryEvent(i, null, exception)); + .handleAttemptFailure(TestRetryPolicy.newClientRetryEvent(i, null, exception)); final boolean expected = i < retries && i < maxAttempts; Assert.assertEquals(expected, action.shouldRetry()); diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java index d69cd1a2e..1b9536b4b 100644 --- a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java +++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java @@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.exceptions.TimeoutIOException; import org.apache.ratis.protocol.exceptions.ResourceUnavailableException; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.Timestamp; import org.junit.Assert; import org.junit.Test; @@ -70,6 +71,10 @@ public class TestRetryPolicy extends BaseTest { } } + static ClientRetryEvent newClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) { + return new ClientRetryEvent(attemptCount, request, attemptCount, cause, Timestamp.currentTime()); + } + @Test public void testRequestTypeDependentRetry() { final RequestTypeDependentRetryPolicy.Builder b = RequestTypeDependentRetryPolicy.newBuilder(); @@ -88,7 +93,7 @@ public class TestRetryPolicy extends BaseTest { RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY)); for(int i = 1; i < 2*n; i++) { { //write - final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest, null); + final ClientRetryEvent event = newClientRetryEvent(i, writeRequest, null); final RetryPolicy.Action action = policy.handleAttemptFailure(event); final boolean expected = i < n; @@ -101,21 +106,21 @@ public class TestRetryPolicy extends BaseTest { } { //read and stale read are using default - final ClientRetryEvent event = new ClientRetryEvent(i, readRequest, null); + final ClientRetryEvent event = newClientRetryEvent(i, readRequest, null); final RetryPolicy.Action action = policy.handleAttemptFailure(event); Assert.assertTrue(action.shouldRetry()); Assert.assertEquals(0L, action.getSleepTime().getDuration()); } { - final ClientRetryEvent event = new ClientRetryEvent(i, staleReadRequest, null); + final ClientRetryEvent event = newClientRetryEvent(i, staleReadRequest, null); final RetryPolicy.Action action = policy.handleAttemptFailure(event); Assert.assertTrue(action.shouldRetry()); Assert.assertEquals(0L, action.getSleepTime().getDuration()); } { //watch has no retry - final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest, null); + final ClientRetryEvent event = newClientRetryEvent(i, watchRequest, null); final RetryPolicy.Action action = policy.handleAttemptFailure(event); Assert.assertFalse(action.shouldRetry()); Assert.assertEquals(0L, action.getSleepTime().getDuration()); @@ -148,7 +153,7 @@ public class TestRetryPolicy extends BaseTest { }; for (RaftClientRequest request : requests) { - final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending); + final ClientRetryEvent event = pending.newClientRetryEvent(request, new Exception()); final RetryPolicy.Action action = policy.handleAttemptFailure(event); Assert.assertTrue(action.shouldRetry()); Assert.assertEquals(0L, action.getSleepTime().getDuration()); @@ -156,7 +161,7 @@ public class TestRetryPolicy extends BaseTest { timeout.sleep(); for (RaftClientRequest request : requests) { - final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending); + final ClientRetryEvent event = pending.newClientRetryEvent(request, new Exception()); final RetryPolicy.Action action = policy.handleAttemptFailure(event); Assert.assertFalse(action.shouldRetry()); } @@ -218,8 +223,7 @@ public class TestRetryPolicy extends BaseTest { */ private void checkEvent(int exceptionAttemptCount, RetryPolicy retryPolicy, RaftClientRequest raftClientRequest, Throwable exception, Pair exceptionPolicyPair) { - final ClientRetryEvent event = - new ClientRetryEvent(exceptionAttemptCount, raftClientRequest, exception); + final ClientRetryEvent event = newClientRetryEvent(exceptionAttemptCount, raftClientRequest, exception); final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event); final boolean expected = exceptionAttemptCount < exceptionPolicyPair.retries; diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java index 73ff1eb53..2f3edf781 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java @@ -261,7 +261,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster> final RaftPeerId leaderId = leader.getId(); ids.add(leaderId); - RaftTestUtil.getStateMachineLogEntries(leaderLog); + RaftTestUtil.getStateMachineLogEntries(leaderLog, LOG::info); // check that the last metadata entry is written to the log JavaUtils.attempt(() -> assertLastLogEntry(leader), 20, HUNDRED_MILLIS, "leader last metadata entry", LOG);
