Author: fhanik Date: Thu Dec 16 21:09:49 2010 New Revision: 1050161 URL: http://svn.apache.org/viewvc?rev=1050161&view=rev Log: Starting to work on maxConnections attribute for BIO/NIO connectors to allow administrators to throttle how accepting connections is handled. Implement a CounterLatch to keep track of the connection count while also allowing the acceptor thread block while the max has been reached
Added: tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java (with props) tomcat/trunk/test/org/apache/tomcat/util/threads/ tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java (with props) Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java?rev=1050161&r1=1050160&r2=1050161&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java (original) +++ tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java Thu Dec 16 21:09:49 2010 @@ -122,6 +122,11 @@ public abstract class AbstractProtocolHa public void setMaxThreads(int maxThreads) { endpoint.setMaxThreads(maxThreads); } + + public int getMaxConnections() { return endpoint.getMaxConnections(); } + public void setMaxConnections(int maxConnections) { + endpoint.setMaxConnections(maxConnections); + } public int getMinSpareThreads() { return endpoint.getMinSpareThreads(); } Added: tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java?rev=1050161&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java Thu Dec 16 21:09:49 2010 @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +/** + * Simple counter latch that allows code to keep an up and down counter, and waits while the latch holds a certain wait value. + * and threads using the latch to wait if the count has reached a certain value. + * The counter latch can be used to keep track of an atomic counter, since the operations {...@link #countDown()} + * and {...@link #countUp()} are atomic. + * When the latch reaches the wait value, threads will block. The counter latch can hence act like a + * count down latch or a count up latch, while letting you keep track of the counter as well. + * This counter latch works opposite as the java.util.concurrent.CountDownLatch, since the CounterLatch only blocks on a single value and releases the threads on all other values. + * @author fhanik + * @see <a href="http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html">CountDownLatch</a> + * + */ +public class CounterLatch { + + private class Sync extends AbstractQueuedSynchronizer { + public Sync() { + } + + protected int tryAcquireShared(int arg) { + return ((!released) && count.get() == signal) ? -1 : 1; + } + + protected boolean tryReleaseShared(int arg) { + return true; + } + } + + private final Sync sync; + private final AtomicLong count; + private long signal; + private volatile boolean released = false; + + /** + * Instantiates a CounterLatch object with an initial value and a wait value. + * @param initial - initial value of the counter + * @param waitValue - when the counter holds this value, + * threads calling {...@link #await()} or {...@link #await(long, TimeUnit)} + * will wait until the counter changes value or until they are interrupted. + */ + public CounterLatch(long initial, long waitValue) { + this.signal = waitValue; + this.count = new AtomicLong(initial); + this.sync = new Sync(); + } + + /** + * Causes the calling thread to wait if the counter holds the waitValue. + * If the counter holds any other value, the thread will return + * If the thread is interrupted or becomes interrupted an InterruptedException is thrown + * @throws InterruptedException + */ + public void await() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + /** + * Causes the calling thread to wait if the counter holds the waitValue. + * If the counter holds any other value, the thread will return + * If the thread is interrupted or becomes interrupted an InterruptedException is thrown + * @return true if the value changed, false if the timeout has elapsed + * @throws InterruptedException + */ + public boolean await(long timeout, TimeUnit unit) throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + /** + * Increments the counter + * @return the previous counter value + */ + public long countUp() { + long previous = count.getAndIncrement(); + if (previous == signal) { + sync.releaseShared(0); + } + return previous; + } + + /** + * Decrements the counter + * @return the previous counter value + */ + public long countDown() { + long previous = count.getAndDecrement(); + if (previous == signal) { + sync.releaseShared(0); + } + return previous; + } + + /** + * Returns the current counter value + * @return the current counter value + */ + public long getCount() { + return count.get(); + } + + /** + * Performs an atomic update of the counter + * If the operation is successful and {...@code expect==waitValue && expect!=update} waiting threads will be released. + * @param expect - the expected counter value + * @param update - the new counter value + * @return + */ + public boolean compareAndSet(long expect, long update) { + boolean result = count.compareAndSet(expect, update); + if (result && expect==signal && expect != update) { + sync.releaseShared(0); + } + return result; + } + + /** + * returns true if there are threads blocked by this latch + * @return true if there are threads blocked by this latch + */ + public boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); + } + + /** + * Returns a collection of the blocked threads + * @return a collection of the blocked threads + */ + public Collection<Thread> getQueuedThreads() { + return sync.getQueuedThreads(); + } + + /** + * releases all waiting threads. This operation is permanent, and no threads will block, + * even if the counter hits the {...@code waitValue} until {...@link #reset(long)} has been called. + * @return + */ + public boolean releaseAll() { + released = true; + return sync.releaseShared(0); + } + + /** + * Resets the latch and initializes the counter with the new value. + * @param value the new counter value + * @see {...@link #releaseAll()} + */ + public void reset(long value) { + this.count.set(value); + released = false; + } + +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java ------------------------------------------------------------------------------ svn:eol-style = native Added: tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java?rev=1050161&view=auto ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java (added) +++ tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java Thu Dec 16 21:09:49 2010 @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.threads; + +import junit.framework.TestCase; + +public class TestCounterLatch extends TestCase { + + private volatile CounterLatch latch = null; + + public void setUp() { + + } + + public void tearDown() { + CounterLatch temp = latch; + if (temp!=null) temp.releaseAll(); + latch = null; + } + + public void testNoThreads() throws Exception { + latch = new CounterLatch(0,0); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } + + public void testOneThreadNoWait() throws Exception { + latch = new CounterLatch(0,1); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + Thread testThread = new Thread() { + public void run() { + try { + latch.await(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + } + }; + testThread.start(); + Thread.sleep(50); + assertEquals("0 threads should be waiting", 0, latch.getQueuedThreads().size()); + latch.countUp(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } + + public void testOneThreadWaitCountUp() throws Exception { + latch = new CounterLatch(0,1); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + Thread testThread = new Thread() { + public void run() { + try { + latch.await(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + } + }; + latch.countUp(); + testThread.start(); + Thread.sleep(50); + assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); + latch.countUp(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } + + public void testOneThreadWaitCountDown() throws Exception { + latch = new CounterLatch(1,0); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + Thread testThread = new Thread() { + public void run() { + try { + //System.out.println("Entering ["+Thread.currentThread().getName()+"]"); + latch.await(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + //System.out.println("Exiting ["+Thread.currentThread().getName()+"]"); + } + }; + latch.countDown(); + testThread.start(); + Thread.sleep(50); + assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); + latch.countDown(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } + + public void testOneRelease() throws Exception { + latch = new CounterLatch(1,0); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + Thread testThread = new Thread() { + public void run() { + try { + latch.await(); + } catch (InterruptedException x) { + x.printStackTrace(); + } + } + }; + latch.countDown(); + testThread.start(); + Thread.sleep(50); + assertEquals("1 threads should be waiting", 1, latch.getQueuedThreads().size()); + latch.releaseAll(); + Thread.sleep(50); + assertEquals("No threads should be waiting", false, latch.hasQueuedThreads()); + } +} Propchange: tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org