This is an automated email from the ASF dual-hosted git repository. dragonyliu pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 6cd8ef3d945eb99e6f553729973447336bb344e0 Author: Tsz-Wo Nicholas Sze <[email protected]> AuthorDate: Fri Aug 5 00:55:48 2022 -0700 RATIS-1657. Intermittent failure in TestLogAppenderWithGrpc#testPendingLimits due to ConcurrentModificationException at SlidingWindow$Client.trySendDelayed. (#703) (cherry picked from commit d9aa480c53a69141aeb5bb7f3279ed843c12f322) --- .../org/apache/ratis/util/CollectionUtils.java | 14 +++++- .../java/org/apache/ratis/util/SlidingWindow.java | 58 ++++++++++++++++------ 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java index cdfd9635b..db0c6fd93 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java @@ -20,6 +20,7 @@ package org.apache.ratis.util; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -103,8 +104,17 @@ public interface CollectionUtils { } static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<Object> name) { - final V returned = map.put(key, value); - Preconditions.assertTrue(returned == null, + return putNew(key, value, map::put, name); + } + + /** For the case that key and value are the same object. */ + static <K> void putNew(K key, Function<K, K> putMethod, Supplier<Object> name) { + putNew(key, key, (k, v) -> putMethod.apply(k), name); + } + + static <K, V> V putNew(K key, V value, BiFunction<K, V, V> putMethod, Supplier<Object> name) { + final V returned = putMethod.apply(key, value); + Preconditions.assertNull(returned, () -> "Entry already exists for key " + key + " in map " + name.get()); return value; } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java index 43b1efcdb..316604db0 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java @@ -17,13 +17,14 @@ */ package org.apache.ratis.util; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -63,14 +64,13 @@ public interface SlidingWindow { /** A seqNum-to-request map, sorted by seqNum. */ class RequestMap<REQUEST extends Request<REPLY>, REPLY> implements Iterable<REQUEST> { - private static boolean logRepeatedly = false; private final Object name; /** Request map: seqNum -> request */ private final SortedMap<Long, REQUEST> requests = new ConcurrentSkipListMap<>(); RequestMap(Object name) { this.name = name; - if (logRepeatedly && LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { JavaUtils.runRepeatedly(this::log, 5, 10, TimeUnit.SECONDS); } } @@ -185,6 +185,33 @@ public interface SlidingWindow { } } + class DelayedRequests { + private final SortedMap<Long, Long> sorted = new TreeMap<>(); + + synchronized Long put(Long seqNum) { + return sorted.put(seqNum, seqNum); + } + + synchronized boolean containsKey(long seqNum) { + return sorted.containsKey(seqNum); + } + + synchronized List<Long> getAllAndClear() { + final List<Long> keys = new ArrayList<>(sorted.keySet()); + sorted.clear(); + return keys; + } + + synchronized Long remove(long seqNum) { + return sorted.remove(seqNum); + } + + @Override + public synchronized String toString() { + return "" + sorted.keySet(); + } + } + /** * Client side sliding window. * A client may @@ -200,7 +227,7 @@ public interface SlidingWindow { /** The requests in the sliding window. */ private final RequestMap<REQUEST, REPLY> requests; /** Delayed requests. */ - private final SortedMap<Long, Long> delayedRequests = new TreeMap<>(); + private final DelayedRequests delayedRequests = new DelayedRequests(); /** The seqNum for the next new request. */ private long nextSeqNum = 1; @@ -214,9 +241,14 @@ public interface SlidingWindow { public Client(Object name) { this.requests = new RequestMap<REQUEST, REPLY>(getName(getClass(), name)) { @Override - @SuppressFBWarnings("IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD") - synchronized void log() { - LOG.debug(toString()); + void log() { + if (LOG.isDebugEnabled()) { + logDebug(); + } + } + + synchronized void logDebug() { + LOG.debug(super.toString()); for (REQUEST r : requests) { LOG.debug(" {}: {}", r.getSeqNum(), r.hasReply() ? "replied" : delayedRequests.containsKey(r.getSeqNum()) ? "delayed" : "submitted"); @@ -229,7 +261,7 @@ public interface SlidingWindow { public synchronized String toString() { return requests + ", nextSeqNum=" + nextSeqNum + ", firstSubmitted=" + firstSeqNum + ", replied? " + firstReplied - + ", delayed=" + delayedRequests.keySet(); + + ", delayed=" + delayedRequests; } /** @@ -282,7 +314,7 @@ public interface SlidingWindow { } // delay other requests - CollectionUtils.putNew(seqNum, seqNum, delayedRequests, () -> requests.getName() + ":delayedRequests"); + CollectionUtils.putNew(seqNum, delayedRequests::put, () -> requests.getName() + ":delayedRequests"); return false; } @@ -326,12 +358,8 @@ public interface SlidingWindow { private void trySendDelayed(Consumer<REQUEST> sendMethod) { if (firstReplied) { // after first received, all other requests can be submitted (out-of-order) - if (!delayedRequests.isEmpty()) { - for (Long seqNum : delayedRequests.keySet()) { - sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed")); - } - delayedRequests.clear(); - } + delayedRequests.getAllAndClear().forEach( + seqNum -> sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed"))); } else { // Otherwise, submit the first only if it is a delayed request final Iterator<REQUEST> i = requests.iterator();
