Author: fpj
Date: Mon Jun 9 22:19:02 2014
New Revision: 1601516
URL: http://svn.apache.org/r1601516
Log:
ZOOKEEPER-1928. add configurable throttling to the number of snapshots
concurrently sent by a leader (Edward Carter via fpj)
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
Modified: zookeeper/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1601516&r1=1601515&r2=1601516&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Mon Jun 9 22:19:02 2014
@@ -29,6 +29,9 @@ NEW FEATURES:
ZOOKEEPER-1887. C implementation of removeWatches (Raul Gutierrez Segales via
michim)
+ ZOOKEEPER-1928. add configurable throttling to the number of snapshots
+ concurrently sent by a leader (Edward Carter via fpj)
+
BUGFIXES:
ZOOKEEPER-786. Exception in ZooKeeper.toString
@@ -729,7 +732,7 @@ IMPROVEMENTS:
ZOOKEEPER-756. some cleanup and improvements for zooinspector
(Colin Goodheart-Smithe & Thomas Koch via phunt)
- ZOOKEEPER-1292. FLETest is flaky (flp via breed)
+ ZOOKEEPER-1292. FLETest is flaky (fpj via breed)
ZOOKEEPER-1326. The CLI commands "delete" and "rmr" are confusing.
Can we have "rm" + "rmr" instead? (Harsh J via phunt)
@@ -923,6 +926,9 @@ IMPROVEMENTS:
ZOOKEEPER-1659. Add JMX support for dynamic reconfiguration (Rakesh R via
michim)
+
+ ZOOKEEPER-1928. add configurable throttling to the number of snapshots
+ concurrently sent by a leader (Edward Carter via fpj)
headers
Modified:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1601516&r1=1601515&r2=1601516&view=diff
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
(original)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
Mon Jun 9 22:19:02 2014
@@ -72,6 +72,21 @@ public class Leader {
}
}
+ // Throttle when there are too many concurrent snapshots being sent to
observers
+ private static final String MAX_CONCURRENT_SNAPSHOTS =
"zookeeper.leader.maxConcurrentSnapshots";
+ private static final int maxConcurrentSnapshots;
+ private static final String MAX_CONCURRENT_SNAPSHOT_TIMEOUT =
"zookeeper.leader.maxConcurrentSnapshotTimeout";
+ private static final long maxConcurrentSnapshotTimeout;
+ static {
+ maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS,
10);
+ LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
+ maxConcurrentSnapshotTimeout =
Long.getLong(MAX_CONCURRENT_SNAPSHOT_TIMEOUT, 5);
+ LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " +
maxConcurrentSnapshotTimeout);
+ }
+
+ private final LearnerSnapshotThrottler learnerSnapshotThrottler =
+ new LearnerSnapshotThrottler(maxConcurrentSnapshots,
maxConcurrentSnapshotTimeout);
+
final LeaderZooKeeperServer zk;
final QuorumPeer self;
@@ -1051,6 +1066,10 @@ public class Leader {
}
return p;
}
+
+ public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
+ return learnerSnapshotThrottler;
+ }
/**
* Process sync requests
Modified:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1601516&r1=1601515&r2=1601516&view=diff
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
(original)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
Mon Jun 9 22:19:02 2014
@@ -423,20 +423,29 @@ public class LearnerHandler extends ZooK
/* if we are not truncating or sending a diff just send a snapshot
*/
if (needSnap) {
- long zxidToSend =
leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
- oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null,
null), "packet");
- bufferedOutput.flush();
-
- LOG.info("Sending snapshot last zxid of peer is 0x"
- + Long.toHexString(peerLastZxid) + " "
- + "zxid of leader is 0x"
- + Long.toHexString(leaderLastZxid) + " "
- + "sent zxid of db as 0x"
- + Long.toHexString(zxidToSend));
- // Dump data to peer
- leader.zk.getZKDatabase().serializeSnapshot(oa);
- oa.writeString("BenWasHere", "signature");
- bufferedOutput.flush();
+ boolean exemptFromThrottle = getLearnerType() !=
LearnerType.OBSERVER;
+ LearnerSnapshot snapshot =
+
leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
+ try {
+ long zxidToSend =
leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
+ oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend,
null, null), "packet");
+ bufferedOutput.flush();
+
+ LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid
of leader is 0x{}, "
+ + "send zxid of db as 0x{}, {} concurrent
snapshots, "
+ + "snapshot was {} from throttle",
+ Long.toHexString(peerLastZxid),
+ Long.toHexString(leaderLastZxid),
+ Long.toHexString(zxidToSend),
+ snapshot.getConcurrentSnapshotNumber(),
+ snapshot.isEssential() ? "exempt" : "not exempt");
+ // Dump data to peer
+ leader.zk.getZKDatabase().serializeSnapshot(oa);
+ oa.writeString("BenWasHere", "signature");
+ bufferedOutput.flush();
+ } finally {
+ snapshot.close();
+ }
}
// Start thread that blast packets in the queue to learner
@@ -580,6 +589,8 @@ public class LearnerHandler extends ZooK
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception causing shutdown", e);
+ } catch (SnapshotThrottleException e) {
+ LOG.error("too many concurrent snapshots: " + e);
} finally {
LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() : "<null>")
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java?rev=1601516&view=auto
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
(added)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
Mon Jun 9 22:19:02 2014
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+public class LearnerSnapshot {
+ private final LearnerSnapshotThrottler throttler;
+ private final int concurrentSnapshotNumber;
+ private final boolean essential;
+
+ LearnerSnapshot(LearnerSnapshotThrottler throttler,
+ int concurrentSnapshotNumber, boolean essential) {
+ this.throttler = throttler;
+ this.concurrentSnapshotNumber = concurrentSnapshotNumber;
+ this.essential = essential;
+ }
+
+ public void close() {
+ throttler.endSnapshot();
+ }
+
+ public int getConcurrentSnapshotNumber() {
+ return concurrentSnapshotNumber;
+ }
+
+ public boolean isEssential() {
+ return essential;
+ }
+}
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java?rev=1601516&view=auto
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
(added)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
Mon Jun 9 22:19:02 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to limit the number of concurrent snapshots from a leader to
+ * observers and followers. {@link LearnerHandler} objects should call
+ * {@link #beginSnapshot(boolean)} before sending a snapshot and
+ * {@link #endSnapshot()} after finishing, successfully or not.
+ *
+ */
+public class LearnerSnapshotThrottler {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(LearnerSnapshotThrottler.class);
+
+ private final Object snapCountSyncObject = new Object();
+ private int snapsInProgress;
+
+ private final int maxConcurrentSnapshots;
+ private final long timeoutMillis;
+
+ /**
+ * Constructs a new instance limiting the concurrent number of snapshots to
+ * <code>maxConcurrentSnapshots</code>.
+ * @param maxConcurrentSnapshots maximum concurrent number of snapshots
+ * @param timeoutMillis milliseconds to attempt to wait when attempting to
+ * begin a snapshot that would otherwise be throttled;
+ * a value of zero means no waiting will be attempted
+ * @throws java.lang.IllegalArgumentException when
<code>timeoutMillis</code>
+ * is negative or
+ *
<code>maxConcurrentSnaphots</code>
+ * is less than 1
+ */
+ public LearnerSnapshotThrottler(int maxConcurrentSnapshots,
+ long timeoutMillis) {
+ if (timeoutMillis < 0) {
+ String errorMsg = "timeout cannot be negative, was " +
timeoutMillis;
+ throw new IllegalArgumentException(errorMsg);
+ }
+ if (maxConcurrentSnapshots <= 0) {
+ String errorMsg = "maxConcurrentSnapshots must be positive, was " +
+ maxConcurrentSnapshots;
+ throw new IllegalArgumentException(errorMsg);
+ }
+
+ this.maxConcurrentSnapshots = maxConcurrentSnapshots;
+ this.timeoutMillis = timeoutMillis;
+
+ synchronized (snapCountSyncObject) {
+ snapsInProgress = 0;
+ }
+ }
+
+ public LearnerSnapshotThrottler(int maxConcurrentSnapshots) {
+ this(maxConcurrentSnapshots, 0);
+ }
+
+ /**
+ * Indicates that a new snapshot is about to be sent.
+ *
+ * @param essential if <code>true</code>, do not throw an exception even
+ * if throttling limit is reached
+ * @throws SnapshotThrottleException if throttling limit has been exceeded
+ * and <code>essential == false</code>,
+ * even after waiting for the timeout
+ * period, if any
+ * @throws InterruptedException if thread is interrupted while trying
+ * to start a snapshot; cannot happen if
+ * timeout is zero
+ */
+ public LearnerSnapshot beginSnapshot(boolean essential)
+ throws SnapshotThrottleException, InterruptedException {
+ int snapshotNumber;
+
+ synchronized (snapCountSyncObject) {
+ if (!essential
+ && timeoutMillis > 0
+ && snapsInProgress >= maxConcurrentSnapshots) {
+ long timestamp = System.currentTimeMillis();
+ do {
+ snapCountSyncObject.wait(timeoutMillis);
+ } while (snapsInProgress >= maxConcurrentSnapshots
+ && timestamp + timeoutMillis <
System.currentTimeMillis());
+ }
+
+ if (essential || snapsInProgress < maxConcurrentSnapshots) {
+ snapsInProgress++;
+ snapshotNumber = snapsInProgress;
+ } else {
+ throw new SnapshotThrottleException(snapsInProgress + 1,
+ maxConcurrentSnapshots);
+ }
+ }
+
+ return new LearnerSnapshot(this, snapshotNumber, essential);
+ }
+
+ /**
+ * Indicates that a snapshot has been completed.
+ */
+ public void endSnapshot() {
+ int newCount;
+ synchronized (snapCountSyncObject) {
+ snapsInProgress--;
+ newCount = snapsInProgress;
+ snapCountSyncObject.notify();
+ }
+
+ if (newCount < 0) {
+ String errorMsg =
+ "endSnapshot() called incorrectly; current snapshot count
is "
+ + newCount;
+ LOG.error(errorMsg);
+ }
+ }
+}
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java?rev=1601516&view=auto
==============================================================================
---
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
(added)
+++
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
Mon Jun 9 22:19:02 2014
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+/**
+ * Thrown when a {@link Leader} has too many concurrent snapshots being sent
+ * to observers.
+ *
+ * @see LearnerSnapshotThrottler
+ *
+ */
+public class SnapshotThrottleException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public SnapshotThrottleException(int concurrentSnapshotNumber, int
throttleThreshold) {
+ super(getMessage(concurrentSnapshotNumber, throttleThreshold));
+ }
+
+ private static String getMessage(int concurrentSnapshotNumber, int
throttleThreshold) {
+ return String.format("new snapshot would make %d concurrently in
progress; " +
+ "maximum is %d", concurrentSnapshotNumber, throttleThreshold);
+ }
+}
Added:
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
URL:
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java?rev=1601516&view=auto
==============================================================================
---
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
(added)
+++
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
Mon Jun 9 22:19:02 2014
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LearnerSnapshotThrottlerTest extends ZKTestCase {
+ @Test(expected = SnapshotThrottleException.class)
+ public void testTooManySnapshotsNonessential() throws Exception {
+ LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+ for (int i = 0; i < 6; i++) {
+ throttler.beginSnapshot(false);
+ }
+ }
+
+ @Test(expected = SnapshotThrottleException.class)
+ public void testTooManySnapshotsEssential() throws Exception {
+ LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+ try {
+ for (int i = 0; i < 6; i++) {
+ throttler.beginSnapshot(true);
+ }
+ }
+ catch (SnapshotThrottleException ex) {
+ Assert.fail("essential snapshots should not be throttled");
+ }
+ throttler.endSnapshot();
+ throttler.beginSnapshot(false);
+ }
+
+ @Test
+ public void testNoThrottle() throws Exception {
+ LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+ try {
+ for (int i = 0; i < 6; i++) {
+ throttler.beginSnapshot(true);
+ }
+ }
+ catch (SnapshotThrottleException ex) {
+ Assert.fail("essential snapshots should not be throttled");
+ }
+ throttler.endSnapshot();
+ for (int i = 0; i < 5; i++) {
+ throttler.endSnapshot();
+ throttler.beginSnapshot(false);
+ }
+ }
+
+ @Test
+ public void testTryWithResourceNoThrottle() throws Exception {
+ LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
+ for (int i = 0; i < 3; i++) {
+ LearnerSnapshot snapshot = throttler.beginSnapshot(false);
+ try {
+ Assert.assertFalse(snapshot.isEssential());
+ Assert.assertEquals(1, snapshot.getConcurrentSnapshotNumber());
+ } finally {
+ snapshot.close();
+ }
+ }
+ }
+
+ @Test(expected = SnapshotThrottleException.class)
+ public void testTryWithResourceThrottle() throws Exception {
+ LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
+ LearnerSnapshot outer = throttler.beginSnapshot(true);
+ try {
+ LearnerSnapshot inner = throttler.beginSnapshot(false);
+ try {
+ Assert.fail("shouldn't be able to have both snapshots open");
+ } finally {
+ inner.close();
+ }
+ } finally {
+ outer.close();
+ }
+ }
+
+ @Test
+ public void testParallelNoThrottle() throws Exception {
+ final int numThreads = 50;
+
+ final LearnerSnapshotThrottler throttler = new
LearnerSnapshotThrottler(numThreads);
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+ final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
+ final CountDownLatch snapshotProgressLatch = new
CountDownLatch(numThreads);
+
+ List<Future<Boolean>> results = new
ArrayList<Future<Boolean>>(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ results.add(threadPool.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() {
+ threadStartLatch.countDown();
+ try {
+ threadStartLatch.await();
+
+ throttler.beginSnapshot(false);
+
+ snapshotProgressLatch.countDown();
+ snapshotProgressLatch.await();
+
+ throttler.endSnapshot();
+ }
+ catch (Exception e) {
+ return false;
+ }
+
+ return true;
+ }
+ }));
+ }
+
+ for (Future<Boolean> result : results) {
+ Assert.assertTrue(result.get());
+ }
+ }
+
+ @Test
+ public void testPositiveTimeout() throws Exception {
+ final LearnerSnapshotThrottler throttler = new
LearnerSnapshotThrottler(1, 200);
+ ExecutorService threadPool = Executors.newFixedThreadPool(1);
+
+ LearnerSnapshot first = throttler.beginSnapshot(false);
+ final CountDownLatch snapshotProgressLatch = new CountDownLatch(1);
+
+ Future<Boolean> result = threadPool.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() {
+ try {
+ snapshotProgressLatch.countDown();
+ LearnerSnapshot second = throttler.beginSnapshot(false);
+ second.close();
+ }
+ catch (Exception e) {
+ return false;
+ }
+
+ return true;
+ }
+ });
+
+ snapshotProgressLatch.await();
+
+ first.close();
+
+ Assert.assertTrue(result.get());
+ }
+
+ @Test
+ public void testHighContentionWithTimeout() throws Exception {
+ int numThreads = 20;
+
+ final LearnerSnapshotThrottler throttler = new
LearnerSnapshotThrottler(2, 200);
+ ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+ final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
+ final CountDownLatch snapshotProgressLatch = new
CountDownLatch(numThreads);
+
+ List<Future<Boolean>> results = new
ArrayList<Future<Boolean>>(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ results.add(threadPool.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() {
+ threadStartLatch.countDown();
+ try {
+ threadStartLatch.await();
+
+ LearnerSnapshot snap = throttler.beginSnapshot(false);
+
+ int snapshotNumber =
snap.getConcurrentSnapshotNumber();
+
+ throttler.endSnapshot();
+
+ return snapshotNumber <= 2;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+ }));
+ }
+
+ for (Future<Boolean> result : results) {
+ Assert.assertTrue(result.get());
+ }
+ }
+}