This is an automated email from the ASF dual-hosted git repository.
maoling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new bc1b231c9 ZOOKEEPER-4327: Fix flaky RequestThrottlerTest
bc1b231c9 is described below
commit bc1b231c9e32667b2978c86a6a64833470973dbd
Author: Kezhu Wang <[email protected]>
AuthorDate: Sun Sep 25 16:21:45 2022 +0800
ZOOKEEPER-4327: Fix flaky RequestThrottlerTest
This PR tries to fix several test failures in `RequestThrottlerTest`.
First, `RequestThrottlerTest#testDropStaleRequests`.
Place `Thread.sleep(200)` after `submittedRequests.take()` in
`RequestThrottler#run` will fail two assertions:
1. `assertEquals(2L, (long) metrics.get("prep_processor_request_queued"))`
2. `assertEquals(1L, (long) metrics.get("request_throttle_wait_count"))`
This happens due to `setStale` chould happen before throttle handling.
This commit solves this by introducing an interception point
`RequestThrottler.throttleSleep` to build happen-before relations:
1. `throttling.countDown` happens before `setStale`, this ensures that
unthrottled request are processed as usual.
2. `setStale` happens before `throttled.await`, this defends
`RequestThrottler.throttleSleep` against spurious wakeup.
Second, `RequestThrottlerTest#testRequestThrottler`.
* `RequestThrottlerTest.testRequestThrottler:197 expected: <2> but was: <1>`
`ZooKeeperServer#submitRequest` and `PrepRequestProcessor#processRequest`
run in different threads, thus there is no guarantee on metric
`prep_processor_request_queued` after `submitted.await(5, TimeUnit.SECONDS)`.
Place `Thread.sleep(200)` before `zks.submitRequestNow(request)` in
`RequestThrottler#run` will incur this failure.
* `RequestThrottlerTest.testRequestThrottler:206 expected: <5> but was: <4>`
`entered.await(STALL_TIME, TimeUnit.MILLISECONDS)` could return `false`
due to almost same timeout as `RequestThrottler#throttleSleep`. Place
`Thread.sleep(500)` around `throttleSleep` will increase failure possibility.
Third,
`RequestThrottlerTest#testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled`.
*
`RequestThrottlerTest.testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled:340
expected: <3> but was: <4>`
`ZooKeeperServer#shouldThrottle` depends on consistent sum of
`getInflight` and `getInProcess`. But it is no true. Place `Thread.sleep(200)`
before `zks.submitRequestNow(request)` in `RequestThrottler#run` could
reproduce this.
Sees also https://github.com/apache/zookeeper/pull/1739,
https://github.com/apache/zookeeper/pull/1821.
Author: Kezhu Wang <[email protected]>
Reviewers: Mate Szalay-Beko <[email protected]>, maoling <[email protected]>
Closes #1887 from
kezhuw/ZOOKEEPER-4327-flaky-RequestThrottlerTest.testDropStaleRequests
---
.../apache/zookeeper/server/RequestThrottler.java | 12 ++--
.../apache/zookeeper/server/ZooKeeperServer.java | 5 +-
.../zookeeper/server/RequestThrottlerTest.java | 66 ++++++++++++++++++----
3 files changed, 64 insertions(+), 19 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
index d60efa087..4a401e5b9 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java
@@ -195,13 +195,11 @@ public class RequestThrottler extends
ZooKeeperCriticalThread {
LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped);
}
- private synchronized void throttleSleep(int stallTime) {
- try {
- ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1);
- this.wait(stallTime);
- } catch (InterruptedException ie) {
- return;
- }
+
+ // @VisibleForTesting
+ synchronized void throttleSleep(int stallTime) throws InterruptedException
{
+ ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1);
+ this.wait(stallTime);
}
@SuppressFBWarnings(value = "NN_NAKED_NOTIFY", justification = "state
change is in ZooKeeperServer.decInProgress() ")
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 0303ca645..817e84b3e 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -749,9 +749,12 @@ public class ZooKeeperServer implements SessionExpirer,
ServerStats.Provider {
}
protected void startRequestThrottler() {
- requestThrottler = new RequestThrottler(this);
+ requestThrottler = createRequestThrottler();
requestThrottler.start();
+ }
+ protected RequestThrottler createRequestThrottler() {
+ return new RequestThrottler(this);
}
protected void setupRequestProcessors() {
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
index ed2239990..152592075 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java
@@ -67,11 +67,17 @@ public class RequestThrottlerTest extends ZKTestCase {
CountDownLatch disconnected = null;
+ CountDownLatch throttled = null;
+ CountDownLatch throttling = null;
+
ZooKeeperServer zks = null;
ServerCnxnFactory f = null;
ZooKeeper zk = null;
int connectionLossCount = 0;
+ private long getCounterMetric(String name) {
+ return (long) MetricsUtils.currentServerMetrics().get(name);
+ }
@BeforeEach
public void setup() throws Exception {
@@ -115,6 +121,11 @@ public class RequestThrottlerTest extends ZKTestCase {
super(snapDir, logDir, tickTime);
}
+ @Override
+ protected RequestThrottler createRequestThrottler() {
+ return new TestRequestThrottler(this);
+ }
+
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
@@ -141,6 +152,24 @@ public class RequestThrottlerTest extends ZKTestCase {
}
}
+ class TestRequestThrottler extends RequestThrottler {
+ public TestRequestThrottler(ZooKeeperServer zks) {
+ super(zks);
+ }
+
+ @Override
+ synchronized void throttleSleep(int stallTime) throws
InterruptedException {
+ if (throttling != null) {
+ throttling.countDown();
+ }
+ super.throttleSleep(stallTime);
+ // Defend against unstable timing and potential spurious wakeup.
+ if (throttled != null) {
+ assertTrue(throttled.await(20, TimeUnit.SECONDS));
+ }
+ }
+ }
+
class TestPrepRequestProcessor extends PrepRequestProcessor {
public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor
syncProcessor) {
@@ -191,18 +220,22 @@ public class RequestThrottlerTest extends ZKTestCase {
// make sure the server received all 5 requests
submitted.await(5, TimeUnit.SECONDS);
- Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
// but only two requests can get into the pipeline because of the
throttler
- assertEquals(2L, (long) metrics.get("prep_processor_request_queued"));
- assertEquals(1L, (long) metrics.get("request_throttle_wait_count"));
+ WaitForCondition requestQueued = () ->
getCounterMetric("prep_processor_request_queued") == 2;
+ waitFor("request not queued", requestQueued, 5);
+
+ WaitForCondition throttleWait = () ->
getCounterMetric("request_throttle_wait_count") >= 1;
+ waitFor("no throttle wait", throttleWait, 5);
// let the requests go through the pipeline and the throttler will be
waken up to allow more requests
// to enter the pipeline
resumeProcess.countDown();
- entered.await(STALL_TIME, TimeUnit.MILLISECONDS);
- metrics = MetricsUtils.currentServerMetrics();
+ // wait for more than one STALL_TIME to reduce timeout before wakeup
+ assertTrue(entered.await(STALL_TIME + 5000, TimeUnit.MILLISECONDS));
+
+ Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
assertEquals(TOTAL_REQUESTS, (long)
metrics.get("prep_processor_request_queued"));
}
@@ -221,6 +254,9 @@ public class RequestThrottlerTest extends ZKTestCase {
resumeProcess = new CountDownLatch(1);
submitted = new CountDownLatch(TOTAL_REQUESTS);
+ throttled = new CountDownLatch(1);
+ throttling = new CountDownLatch(1);
+
// send 5 requests asynchronously
for (int i = 0; i < TOTAL_REQUESTS; i++) {
zk.create("/request_throttle_test- " + i,
("/request_throttle_test- "
@@ -231,11 +267,18 @@ public class RequestThrottlerTest extends ZKTestCase {
// make sure the server received all 5 requests
assertTrue(submitted.await(5, TimeUnit.SECONDS));
+ // stale throttled requests
+ assertTrue(throttling.await(5, TimeUnit.SECONDS));
for (ServerCnxn cnxn : f.cnxns) {
cnxn.setStale();
}
+ throttled.countDown();
zk = null;
+ // only first three requests are counted as finished
+ finished = new CountDownLatch(3);
+
+ // let the requests go through the pipeline
resumeProcess.countDown();
LOG.info("raise the latch");
@@ -243,6 +286,8 @@ public class RequestThrottlerTest extends ZKTestCase {
Thread.sleep(50);
}
+ assertTrue(finished.await(5, TimeUnit.SECONDS));
+
// assert after all requests processed to avoid concurrent issues as
metrics are
// counted in different threads.
Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
@@ -327,7 +372,6 @@ public class RequestThrottlerTest extends ZKTestCase {
RequestThrottler.setMaxRequests(0);
resumeProcess = new CountDownLatch(1);
int totalRequests = 10;
- submitted = new CountDownLatch(totalRequests);
for (int i = 0; i < totalRequests; i++) {
zk.create("/request_throttle_test- " + i,
("/request_throttle_test- "
@@ -335,16 +379,16 @@ public class RequestThrottlerTest extends ZKTestCase {
}, null);
}
- submitted.await(5, TimeUnit.SECONDS);
-
// We should start throttling instead of queuing more requests.
//
// We always allow up to GLOBAL_OUTSTANDING_LIMIT + 1 number of
requests coming in request processing pipeline
// before throttling. For the next request, we will throttle by
disabling receiving future requests but we still
- // allow this single request coming in. So the total number of
queued requests in processing pipeline would
+ // allow this single request coming in. Ideally, the total number
of queued requests in processing pipeline would
// be GLOBAL_OUTSTANDING_LIMIT + 2.
- assertEquals(Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2,
- (long)
MetricsUtils.currentServerMetrics().get("prep_processor_request_queued"));
+ //
+ // But due to leak of consistent view of number of outstanding
requests, the number could be larger.
+ WaitForCondition requestQueued = () ->
getCounterMetric("prep_processor_request_queued") >=
Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2;
+ waitFor("no enough requests queued", requestQueued, 5);
resumeProcess.countDown();
} catch (Exception e) {