This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 1053da1 RATIS-703. Intermittent ambiguous attempt(..) method in
JavaUtils. Contributed by Henrik Hegardt
1053da1 is described below
commit 1053da1013638cee8b44d9ac6b0570c49cac50e7
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 16 06:54:05 2019 +0800
RATIS-703. Intermittent ambiguous attempt(..) method in JavaUtils.
Contributed by Henrik Hegardt
---
.../main/java/org/apache/ratis/util/JavaUtils.java | 28 +++++++---------------
.../apache/ratis/server/impl/ServerImplUtils.java | 2 +-
.../ratis/InstallSnapshotNotificationTests.java | 4 ++--
.../org/apache/ratis/RaftExceptionBaseTest.java | 4 ++--
.../test/java/org/apache/ratis/RaftTestUtil.java | 4 ++--
.../java/org/apache/ratis/RetryCacheTests.java | 2 +-
.../ratis/server/impl/LeaderElectionTests.java | 2 +-
.../server/impl/RaftReconfigurationBaseTest.java | 6 ++---
.../ratis/statemachine/RaftSnapshotBaseTest.java | 4 ++--
.../apache/ratis/server/ServerRestartTests.java | 12 +++++-----
10 files changed, 28 insertions(+), 40 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index d400414..c37b792 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -139,7 +139,7 @@ public interface JavaUtils {
}
/** Attempt to get a return value from the given supplier multiple times. */
- static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+ static <RETURN, THROWABLE extends Throwable> RETURN attemptRepeatedly(
CheckedSupplier<RETURN, THROWABLE> supplier,
int numAttempts, TimeDuration sleepTime, String name, Logger log)
throws THROWABLE, InterruptedException {
@@ -169,34 +169,22 @@ public interface JavaUtils {
static <THROWABLE extends Throwable> void attempt(
CheckedRunnable<THROWABLE> runnable, int numAttempts, TimeDuration
sleepTime, String name, Logger log)
throws THROWABLE, InterruptedException {
- attempt(CheckedRunnable.asCheckedSupplier(runnable), numAttempts,
sleepTime, name, log);
+ attemptRepeatedly(CheckedRunnable.asCheckedSupplier(runnable),
numAttempts, sleepTime, name, log);
}
/** Attempt to wait the given condition to return true multiple times. */
- static void attempt(
+ static void attemptUntilTrue(
BooleanSupplier condition, int numAttempts, TimeDuration sleepTime,
String name, Logger log)
throws InterruptedException {
Objects.requireNonNull(condition, "condition == null");
- Preconditions.assertTrue(numAttempts > 0, () -> "numAttempts = " +
numAttempts + " <= 0");
- Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " +
sleepTime + " < 0");
-
- for(int i = 1; i <= numAttempts; i++) {
- if (condition.getAsBoolean()) {
- return;
- }
- if (log != null && log.isWarnEnabled()) {
- log.warn("FAILED " + name + " attempt #" + i + "/" + numAttempts
- + ": sleep " + sleepTime + " and then retry.");
+ attempt(() -> {
+ if (!condition.getAsBoolean()) {
+ throw new IllegalStateException("Condition " + name + " is false.");
}
-
- sleepTime.sleep();
- }
-
- if (!condition.getAsBoolean()) {
- throw new IllegalStateException("Failed " + name + " for " + numAttempts
+ " attempts.");
- }
+ }, numAttempts, sleepTime, name, log);
}
+
static Timer runRepeatedly(Runnable runnable, long delay, long period,
TimeUnit unit) {
final Timer timer = new Timer(true);
timer.schedule(new TimerTask() {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 65494e1..8f91c23 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -49,7 +49,7 @@ public class ServerImplUtils {
final RaftServerProxy proxy;
try {
// attempt multiple times to avoid temporary bind exception
- proxy = JavaUtils.attempt(
+ proxy = JavaUtils.attemptRepeatedly(
() -> new RaftServerProxy(id, stateMachineRegistry, properties,
parameters),
5, sleepTime, "new RaftServerProxy", RaftServerProxy.LOG);
} catch (InterruptedException e) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 894bf6a..8ce6cda 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -136,7 +136,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles =
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
- JavaUtils.attempt(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+ JavaUtils.attemptRepeatedly(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
10, ONE_SECOND, "snapshotFile.exist", LOG);
logs = storageDirectory.getLogSegmentFiles();
} finally {
@@ -212,7 +212,7 @@ public abstract class
InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
LOG.info("{}: oldLeaderNextIndex = {}", leaderId, oldLeaderNextIndex);
final List<File> snapshotFiles =
RaftSnapshotBaseTest.getSnapshotFiles(cluster,
oldLeaderNextIndex - SNAPSHOT_TRIGGER_THRESHOLD, oldLeaderNextIndex);
- JavaUtils.attempt(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+ JavaUtils.attemptRepeatedly(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
10, ONE_SECOND, "snapshotFile.exist", LOG);
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 0cc9265..ea1832c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -80,7 +80,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends
MiniRaftCluster>
}
final RaftClientRpc rpc = client.getClientRpc();
- JavaUtils.attempt(() -> assertNotLeaderException(newLeader, "m2",
oldLeader, rpc, cluster),
+ JavaUtils.attemptRepeatedly(() -> assertNotLeaderException(newLeader,
"m2", oldLeader, rpc, cluster),
10, ONE_SECOND, "assertNotLeaderException", LOG);
sendMessage("m3", client);
@@ -128,7 +128,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends
MiniRaftCluster>
// it is possible that the remote peer's rpc server is not ready. need
retry
final RaftClientRpc rpc = client.getClientRpc();
- final RaftClientReply reply = JavaUtils.attempt(
+ final RaftClientReply reply = JavaUtils.attemptRepeatedly(
() -> assertNotLeaderException(newLeader, "m1", oldLeader, rpc,
cluster),
10, ONE_SECOND, "assertNotLeaderException", LOG);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index f202207..8339972 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -94,7 +94,7 @@ public interface RaftTestUtil {
exception.set(ise);
};
- final RaftServerImpl leader = JavaUtils.attempt(
+ final RaftServerImpl leader = JavaUtils.attemptRepeatedly(
() -> cluster.getLeader(groupId, handleNoLeaders,
handleMultipleLeaders),
numAttempts, sleepTime, name, LOG);
@@ -359,7 +359,7 @@ public interface RaftTestUtil {
final String name = JavaUtils.getCallerStackTraceElement().getMethodName()
+ "-changeLeader";
cluster.setBlockRequestsFrom(oldLeader.toString(), true);
try {
- return JavaUtils.attempt(() -> {
+ return JavaUtils.attemptRepeatedly(() -> {
final RaftPeerId newLeader = waitForLeader(cluster).getId();
if (newLeader.equals(oldLeader)) {
throw constructor.apply("Failed to change leader: newLeader ==
oldLeader == " + oldLeader);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 20656b2..1bc9dc1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -141,7 +141,7 @@ public abstract class RetryCacheTests<CLUSTER extends
MiniRaftCluster>
// trigger setConfiguration
cluster.setConfiguration(allPeers);
- final RaftPeerId newLeaderId = JavaUtils.attempt(() -> {
+ final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> {
final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
Assert.assertNotEquals(leaderId, id);
return id;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 3818b41..fa09b7d 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -137,7 +137,7 @@ public abstract class LeaderElectionTests<CLUSTER extends
MiniRaftCluster>
// start the last server
final RaftServerProxy lastServer = i.next();
lastServer.start();
- final RaftPeerId lastServerLeaderId = JavaUtils.attempt(
+ final RaftPeerId lastServerLeaderId = JavaUtils.attemptRepeatedly(
() ->
Optional.ofNullable(lastServer.getImpls().iterator().next().getState().getLeaderId())
.orElseThrow(() -> new IllegalStateException("No leader yet")),
10, ONE_SECOND, "getLeaderId", LOG);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 4f456fa..81f99fb 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -521,7 +521,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
// find ConfigurationEntry
final TimeDuration sleepTime = TimeDuration.valueOf(500,
TimeUnit.MILLISECONDS);
- final long confIndex = JavaUtils.attempt(() -> {
+ final long confIndex = JavaUtils.attemptRepeatedly(() -> {
final long last = log.getLastEntryTermIndex().getIndex();
for (long i = last; i >= 1; i--) {
if (log.get(i).hasConfigurationEntry()) {
@@ -532,7 +532,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
}, 10, sleepTime, "confIndex", LOG);
// wait till the old leader persist the new conf
- JavaUtils.attempt(() -> log.getFlushIndex() >= confIndex,
+ JavaUtils.attemptRepeatedly(() -> log.getFlushIndex() >= confIndex,
10, sleepTime, "FLUSH", LOG);
final long committed = log.getLastCommittedIndex();
Assert.assertTrue(committed < confIndex);
@@ -546,7 +546,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER
extends MiniRaftCluste
Assert.assertTrue(gotNotLeader.get());
// the old leader should have truncated the setConf from the log
- JavaUtils.attempt(() -> log.getLastCommittedIndex() >= confIndex,
+ JavaUtils.attemptRepeatedly(() -> log.getLastCommittedIndex() >=
confIndex,
10, ONE_SECOND, "COMMIT", LOG);
Assert.assertTrue(log.get(confIndex).hasConfigurationEntry());
log2 = null;
diff --git
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 55c6ce9..26b2ca1 100644
---
a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -136,7 +136,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest
{
LOG.info("nextIndex = {}", nextIndex);
// wait for the snapshot to be done
final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex -
SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
- JavaUtils.attempt(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+ JavaUtils.attemptRepeatedly(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
10, ONE_SECOND, "snapshotFile.exist", LOG);
// restart the peer and check if it can correctly load snapshot
@@ -184,7 +184,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest
{
final long nextIndex =
cluster.getLeader().getState().getLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex -
SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
- JavaUtils.attempt(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
+ JavaUtils.attemptRepeatedly(() ->
snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists),
10, ONE_SECOND, "snapshotFile.exist", LOG);
logs = storageDirectory.getLogSegmentFiles();
} finally {
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 32d1217..d39723d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -112,7 +112,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
final long leaderLastIndex =
cluster.getLeader().getState().getLog().getLastEntryTermIndex().getIndex();
// make sure the restarted follower can catchup
final ServerState followerState =
cluster.getRaftServerImpl(followerId).getState();
- JavaUtils.attempt(() -> followerState.getLastAppliedIndex() >=
leaderLastIndex,
+ JavaUtils.attemptRepeatedly(() -> followerState.getLastAppliedIndex() >=
leaderLastIndex,
10, ONE_SECOND, "follower catchup", LOG);
// make sure the restarted peer's log segments is correct
@@ -188,7 +188,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
void runTestRestartWithCorruptedLogHeader(MiniRaftCluster cluster) throws
Exception {
RaftTestUtil.waitForLeader(cluster);
for(RaftServerImpl impl : cluster.iterateServerImpls()) {
- JavaUtils.attempt(() -> getOpenLogFile(impl), 10,
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS),
+ JavaUtils.attemptRepeatedly(() -> getOpenLogFile(impl), 10,
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS),
impl.getId() + ": wait for log file creation", LOG);
}
@@ -196,7 +196,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
cluster.getServers().forEach(RaftServerProxy::close);
for(RaftServerImpl impl : cluster.iterateServerImpls()) {
- final File openLogFile = JavaUtils.attempt(() -> getOpenLogFile(impl),
+ final File openLogFile = JavaUtils.attemptRepeatedly(() ->
getOpenLogFile(impl),
10, HUNDRED_MILLIS, impl.getId() + "-getOpenLogFile", LOG);
for(int i = 0; i < SegmentedRaftLogFormat.getHeaderLength(); i++) {
assertCorruptedLogHeader(impl.getId(), openLogFile, i, cluster, LOG);
@@ -292,9 +292,9 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
cluster.restartServer(id, false);
final RaftServerImpl server = cluster.getRaftServerImpl(id);
final RaftLog raftLog = server.getState().getLog();
- JavaUtils.attempt(() -> raftLog.getLastCommittedIndex() >=
loggedCommitIndex,
+ JavaUtils.attemptRepeatedly(() -> raftLog.getLastCommittedIndex() >=
loggedCommitIndex,
10, HUNDRED_MILLIS, id + "(commitIndex >= loggedCommitIndex)", LOG);
- JavaUtils.attempt(() -> server.getState().getLastAppliedIndex() >=
loggedCommitIndex,
+ JavaUtils.attemptRepeatedly(() ->
server.getState().getLastAppliedIndex() >= loggedCommitIndex,
10, HUNDRED_MILLIS, id + "(lastAppliedIndex >= loggedCommitIndex)",
LOG);
LOG.info("{}: commitIndex={}, lastAppliedIndex={}",
id, raftLog.getLastCommittedIndex(),
server.getState().getLastAppliedIndex());
@@ -364,7 +364,7 @@ public abstract class ServerRestartTests<CLUSTER extends
MiniRaftCluster>
leader.getProxy().close();
// corrupt the log
- final File openLogFile = JavaUtils.attempt(() -> getOpenLogFile(leader),
+ final File openLogFile = JavaUtils.attemptRepeatedly(() ->
getOpenLogFile(leader),
10, HUNDRED_MILLIS, id + "-getOpenLogFile", LOG);
try(final RandomAccessFile raf = new RandomAccessFile(openLogFile, "rw")) {
final long mid = size / 2;