Author: fpj
Date: Mon Jun  9 22:19:02 2014
New Revision: 1601516

URL: http://svn.apache.org/r1601516
Log:
ZOOKEEPER-1928. add configurable throttling to the number of snapshots 
concurrently sent by a leader (Edward Carter via fpj)


Added:
    
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
    
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
    
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
    
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

Modified: zookeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1601516&r1=1601515&r2=1601516&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Mon Jun  9 22:19:02 2014
@@ -29,6 +29,9 @@ NEW FEATURES:
   ZOOKEEPER-1887. C implementation of removeWatches (Raul Gutierrez Segales via
   michim)
 
+  ZOOKEEPER-1928. add configurable throttling to the number of snapshots
+  concurrently sent by a leader (Edward Carter via fpj)
+
 BUGFIXES:
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString
@@ -729,7 +732,7 @@ IMPROVEMENTS:
   ZOOKEEPER-756. some cleanup and improvements for zooinspector
   (Colin Goodheart-Smithe & Thomas Koch via phunt)
 
-  ZOOKEEPER-1292. FLETest is flaky (flp via breed)
+  ZOOKEEPER-1292. FLETest is flaky (fpj via breed)
 
   ZOOKEEPER-1326. The CLI commands "delete" and "rmr" are confusing.
   Can we have "rm" + "rmr" instead? (Harsh J via phunt)
@@ -923,6 +926,9 @@ IMPROVEMENTS:
 
   ZOOKEEPER-1659. Add JMX support for dynamic reconfiguration (Rakesh R via
   michim)
+  
+  ZOOKEEPER-1928. add configurable throttling to the number of snapshots
+  concurrently sent by a leader (Edward Carter via fpj)
 
 headers
 

Modified: 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1601516&r1=1601515&r2=1601516&view=diff
==============================================================================
--- 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java 
(original)
+++ 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java 
Mon Jun  9 22:19:02 2014
@@ -72,6 +72,21 @@ public class Leader {
         }
     }
 
+    // Throttle when there are too many concurrent snapshots being sent to 
observers
+    private static final String MAX_CONCURRENT_SNAPSHOTS = 
"zookeeper.leader.maxConcurrentSnapshots";
+    private static final int maxConcurrentSnapshots;
+    private static final String MAX_CONCURRENT_SNAPSHOT_TIMEOUT = 
"zookeeper.leader.maxConcurrentSnapshotTimeout";
+    private static final long maxConcurrentSnapshotTimeout;
+    static {
+        maxConcurrentSnapshots = Integer.getInteger(MAX_CONCURRENT_SNAPSHOTS, 
10);
+        LOG.info(MAX_CONCURRENT_SNAPSHOTS + " = " + maxConcurrentSnapshots);
+        maxConcurrentSnapshotTimeout = 
Long.getLong(MAX_CONCURRENT_SNAPSHOT_TIMEOUT, 5);
+        LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " + 
maxConcurrentSnapshotTimeout);
+    }
+
+    private final LearnerSnapshotThrottler learnerSnapshotThrottler = 
+        new LearnerSnapshotThrottler(maxConcurrentSnapshots, 
maxConcurrentSnapshotTimeout);
+
     final LeaderZooKeeperServer zk;
 
     final QuorumPeer self;
@@ -1051,6 +1066,10 @@ public class Leader {
         }
         return p;
     }
+    
+    public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
+        return learnerSnapshotThrottler;
+    }
 
     /**
      * Process sync requests

Modified: 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1601516&r1=1601515&r2=1601516&view=diff
==============================================================================
--- 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
 (original)
+++ 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
 Mon Jun  9 22:19:02 2014
@@ -423,20 +423,29 @@ public class LearnerHandler extends ZooK
 
             /* if we are not truncating or sending a diff just send a snapshot 
*/
             if (needSnap) {
-                long zxidToSend = 
leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
-                oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, 
null), "packet");
-                bufferedOutput.flush();
-
-                LOG.info("Sending snapshot last zxid of peer is 0x"
-                        + Long.toHexString(peerLastZxid) + " "
-                        + "zxid of leader is 0x"
-                        + Long.toHexString(leaderLastZxid) + " "
-                        + "sent zxid of db as 0x"
-                        + Long.toHexString(zxidToSend));
-                // Dump data to peer
-                leader.zk.getZKDatabase().serializeSnapshot(oa);
-                oa.writeString("BenWasHere", "signature");
-                bufferedOutput.flush();
+                boolean exemptFromThrottle = getLearnerType() != 
LearnerType.OBSERVER;
+                LearnerSnapshot snapshot = 
+                        
leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
+                try {
+                    long zxidToSend = 
leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
+                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, 
null, null), "packet");
+                    bufferedOutput.flush();
+
+                    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid 
of leader is 0x{}, "
+                            + "send zxid of db as 0x{}, {} concurrent 
snapshots, " 
+                            + "snapshot was {} from throttle",
+                            Long.toHexString(peerLastZxid), 
+                            Long.toHexString(leaderLastZxid),
+                            Long.toHexString(zxidToSend), 
+                            snapshot.getConcurrentSnapshotNumber(),
+                            snapshot.isEssential() ? "exempt" : "not exempt");
+                    // Dump data to peer
+                    leader.zk.getZKDatabase().serializeSnapshot(oa);
+                    oa.writeString("BenWasHere", "signature");
+                    bufferedOutput.flush();
+                } finally {
+                    snapshot.close();
+                }
             }
 
             // Start thread that blast packets in the queue to learner
@@ -580,6 +589,8 @@ public class LearnerHandler extends ZooK
             }
         } catch (InterruptedException e) {
             LOG.error("Unexpected exception causing shutdown", e);
+        } catch (SnapshotThrottleException e) {
+            LOG.error("too many concurrent snapshots: " + e);
         } finally {
             LOG.warn("******* GOODBYE "
                     + (sock != null ? sock.getRemoteSocketAddress() : "<null>")

Added: 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java?rev=1601516&view=auto
==============================================================================
--- 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
 (added)
+++ 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshot.java
 Mon Jun  9 22:19:02 2014
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+public class LearnerSnapshot {
+    private final LearnerSnapshotThrottler throttler;
+    private final int concurrentSnapshotNumber;
+    private final boolean essential;
+
+    LearnerSnapshot(LearnerSnapshotThrottler throttler, 
+            int concurrentSnapshotNumber, boolean essential) {
+        this.throttler = throttler;
+        this.concurrentSnapshotNumber = concurrentSnapshotNumber;
+        this.essential = essential;
+    }
+
+    public void close() {
+        throttler.endSnapshot();
+    }
+
+    public int getConcurrentSnapshotNumber() {
+        return concurrentSnapshotNumber;
+    }
+    
+    public boolean isEssential() {
+        return essential;
+    }
+}

Added: 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java?rev=1601516&view=auto
==============================================================================
--- 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
 (added)
+++ 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottler.java
 Mon Jun  9 22:19:02 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to limit the number of concurrent snapshots from a leader to
+ * observers and followers.  {@link LearnerHandler} objects should call
+ * {@link #beginSnapshot(boolean)} before sending a snapshot and
+ * {@link #endSnapshot()} after finishing, successfully or not.
+ *
+ */
+public class LearnerSnapshotThrottler {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(LearnerSnapshotThrottler.class);
+
+    private final Object snapCountSyncObject = new Object();
+    private int snapsInProgress;
+
+    private final int maxConcurrentSnapshots;
+    private final long timeoutMillis;
+
+    /**
+     * Constructs a new instance limiting the concurrent number of snapshots to
+     * <code>maxConcurrentSnapshots</code>.
+     * @param maxConcurrentSnapshots maximum concurrent number of snapshots
+     * @param timeoutMillis milliseconds to attempt to wait when attempting to
+     *                      begin a snapshot that would otherwise be throttled;
+     *                      a value of zero means no waiting will be attempted
+     * @throws java.lang.IllegalArgumentException when 
<code>timeoutMillis</code>
+     *                                            is negative or
+     *                                            
<code>maxConcurrentSnaphots</code>
+     *                                            is less than 1
+     */
+    public LearnerSnapshotThrottler(int maxConcurrentSnapshots,
+                                    long timeoutMillis) {
+        if (timeoutMillis < 0) {
+            String errorMsg = "timeout cannot be negative, was " + 
timeoutMillis;
+            throw new IllegalArgumentException(errorMsg);
+        }
+        if (maxConcurrentSnapshots <= 0) {
+            String errorMsg = "maxConcurrentSnapshots must be positive, was " +
+                    maxConcurrentSnapshots;
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        this.maxConcurrentSnapshots = maxConcurrentSnapshots;
+        this.timeoutMillis = timeoutMillis;
+
+        synchronized (snapCountSyncObject) {
+            snapsInProgress = 0;
+        }
+    }
+
+    public LearnerSnapshotThrottler(int maxConcurrentSnapshots) {
+        this(maxConcurrentSnapshots, 0);
+    }
+
+    /**
+     * Indicates that a new snapshot is about to be sent.
+     * 
+     * @param essential if <code>true</code>, do not throw an exception even
+     *                  if throttling limit is reached
+     * @throws SnapshotThrottleException if throttling limit has been exceeded
+     *                                   and <code>essential == false</code>,
+     *                                   even after waiting for the timeout
+     *                                   period, if any
+     * @throws InterruptedException if thread is interrupted while trying
+     *                              to start a snapshot; cannot happen if
+     *                              timeout is zero
+     */
+    public LearnerSnapshot beginSnapshot(boolean essential)
+            throws SnapshotThrottleException, InterruptedException {
+        int snapshotNumber;
+
+        synchronized (snapCountSyncObject) {
+            if (!essential
+                && timeoutMillis > 0
+                && snapsInProgress >= maxConcurrentSnapshots) {
+                long timestamp = System.currentTimeMillis();
+                do {
+                    snapCountSyncObject.wait(timeoutMillis);
+                } while (snapsInProgress >= maxConcurrentSnapshots
+                         && timestamp + timeoutMillis < 
System.currentTimeMillis());
+            }
+
+            if (essential || snapsInProgress < maxConcurrentSnapshots) {
+                snapsInProgress++;
+                snapshotNumber = snapsInProgress;
+            } else {
+                throw new SnapshotThrottleException(snapsInProgress + 1,
+                                                    maxConcurrentSnapshots);
+            }
+        }
+
+        return new LearnerSnapshot(this, snapshotNumber, essential);
+    }
+
+    /**
+     * Indicates that a snapshot has been completed.
+     */
+    public void endSnapshot() {
+        int newCount;
+        synchronized (snapCountSyncObject) {
+            snapsInProgress--;
+            newCount = snapsInProgress;
+            snapCountSyncObject.notify();
+        }
+
+        if (newCount < 0) {
+            String errorMsg =
+                    "endSnapshot() called incorrectly; current snapshot count 
is "
+                            + newCount;
+            LOG.error(errorMsg);
+        }
+    }
+}

Added: 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java?rev=1601516&view=auto
==============================================================================
--- 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
 (added)
+++ 
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/SnapshotThrottleException.java
 Mon Jun  9 22:19:02 2014
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+/**
+ * Thrown when a {@link Leader} has too many concurrent snapshots being sent
+ * to observers.
+ * 
+ * @see LearnerSnapshotThrottler
+ *
+ */
+public class SnapshotThrottleException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public SnapshotThrottleException(int concurrentSnapshotNumber, int 
throttleThreshold) {
+        super(getMessage(concurrentSnapshotNumber, throttleThreshold));
+    }
+
+    private static String getMessage(int concurrentSnapshotNumber, int 
throttleThreshold) {
+        return String.format("new snapshot would make %d concurrently in 
progress; " +
+                "maximum is %d", concurrentSnapshotNumber, throttleThreshold);
+    }
+}

Added: 
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
URL: 
http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java?rev=1601516&view=auto
==============================================================================
--- 
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
 (added)
+++ 
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerSnapshotThrottlerTest.java
 Mon Jun  9 22:19:02 2014
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LearnerSnapshotThrottlerTest extends ZKTestCase {
+    @Test(expected = SnapshotThrottleException.class)
+    public void testTooManySnapshotsNonessential() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+        for (int i = 0; i < 6; i++) {
+            throttler.beginSnapshot(false);
+        }
+    }
+
+    @Test(expected = SnapshotThrottleException.class)
+    public void testTooManySnapshotsEssential() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+        try {
+            for (int i = 0; i < 6; i++) {
+                throttler.beginSnapshot(true);
+            }
+        }
+        catch (SnapshotThrottleException ex) {
+            Assert.fail("essential snapshots should not be throttled");
+        }
+        throttler.endSnapshot();
+        throttler.beginSnapshot(false);
+    }
+
+    @Test
+    public void testNoThrottle() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(5);
+        try {
+            for (int i = 0; i < 6; i++) {
+                throttler.beginSnapshot(true);
+            }
+        }
+        catch (SnapshotThrottleException ex) {
+            Assert.fail("essential snapshots should not be throttled");
+        }
+        throttler.endSnapshot();
+        for (int i = 0; i < 5; i++) {
+            throttler.endSnapshot();
+            throttler.beginSnapshot(false);
+        }
+    }
+
+    @Test
+    public void testTryWithResourceNoThrottle() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
+        for (int i = 0; i < 3; i++) {
+            LearnerSnapshot snapshot = throttler.beginSnapshot(false);
+            try {
+                Assert.assertFalse(snapshot.isEssential());
+                Assert.assertEquals(1, snapshot.getConcurrentSnapshotNumber());
+            } finally {
+                snapshot.close();
+            }
+        }
+    }
+
+    @Test(expected = SnapshotThrottleException.class)
+    public void testTryWithResourceThrottle() throws Exception {
+        LearnerSnapshotThrottler throttler = new LearnerSnapshotThrottler(1);
+        LearnerSnapshot outer = throttler.beginSnapshot(true);
+        try {
+            LearnerSnapshot inner = throttler.beginSnapshot(false);
+            try {
+                Assert.fail("shouldn't be able to have both snapshots open");
+            } finally {
+                inner.close();
+            }
+        } finally {
+            outer.close();
+        }
+    }
+
+    @Test
+    public void testParallelNoThrottle() throws Exception {
+        final int numThreads = 50;
+
+        final LearnerSnapshotThrottler throttler = new 
LearnerSnapshotThrottler(numThreads);
+        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
+        final CountDownLatch snapshotProgressLatch = new 
CountDownLatch(numThreads);
+
+        List<Future<Boolean>> results = new 
ArrayList<Future<Boolean>>(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            results.add(threadPool.submit(new Callable<Boolean>() {
+
+                @Override
+                public Boolean call() {
+                    threadStartLatch.countDown();
+                    try {
+                        threadStartLatch.await();
+
+                        throttler.beginSnapshot(false);
+
+                        snapshotProgressLatch.countDown();
+                        snapshotProgressLatch.await();
+
+                        throttler.endSnapshot();
+                    }
+                    catch (Exception e) {
+                        return false;
+                    }
+
+                    return true;
+                }
+            }));
+        }
+
+        for (Future<Boolean> result : results) {
+            Assert.assertTrue(result.get());
+        }
+    }
+
+    @Test
+    public void testPositiveTimeout() throws Exception {
+        final LearnerSnapshotThrottler throttler = new 
LearnerSnapshotThrottler(1, 200);
+        ExecutorService threadPool = Executors.newFixedThreadPool(1);
+
+        LearnerSnapshot first = throttler.beginSnapshot(false);
+        final CountDownLatch snapshotProgressLatch = new CountDownLatch(1);
+
+        Future<Boolean> result = threadPool.submit(new Callable<Boolean>() {
+            @Override
+            public Boolean call() {
+                try {
+                    snapshotProgressLatch.countDown();
+                    LearnerSnapshot second = throttler.beginSnapshot(false);
+                    second.close();
+                }
+                catch (Exception e) {
+                    return false;
+                }
+
+                return true;
+            }
+        });
+
+        snapshotProgressLatch.await();
+
+        first.close();
+
+        Assert.assertTrue(result.get());
+    }
+
+    @Test
+    public void testHighContentionWithTimeout() throws Exception {
+        int numThreads = 20;
+
+        final LearnerSnapshotThrottler throttler = new 
LearnerSnapshotThrottler(2, 200);
+        ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
+        final CountDownLatch threadStartLatch = new CountDownLatch(numThreads);
+        final CountDownLatch snapshotProgressLatch = new 
CountDownLatch(numThreads);
+
+        List<Future<Boolean>> results = new 
ArrayList<Future<Boolean>>(numThreads);
+        for (int i = 0; i < numThreads; i++) {
+            results.add(threadPool.submit(new Callable<Boolean>() {
+
+                @Override
+                public Boolean call() {
+                    threadStartLatch.countDown();
+                    try {
+                        threadStartLatch.await();
+
+                        LearnerSnapshot snap = throttler.beginSnapshot(false);
+
+                        int snapshotNumber = 
snap.getConcurrentSnapshotNumber();
+
+                        throttler.endSnapshot();
+
+                        return snapshotNumber <= 2;
+                    }
+                    catch (Exception e) {
+                        return false;
+                    }
+                }
+            }));
+        }
+
+        for (Future<Boolean> result : results) {
+            Assert.assertTrue(result.get());
+        }
+    }
+}


Reply via email to