On 16 December 2010 21:09, <[email protected]> wrote:
> Author: fhanik
> Date: Thu Dec 16 21:09:49 2010
> New Revision: 1050161
>
> URL: http://svn.apache.org/viewvc?rev=1050161&view=rev
> Log:
> Starting to work on maxConnections attribute for BIO/NIO connectors to allow
> administrators to throttle how accepting
> connections is handled. Implement a CounterLatch to keep track of the
> connection count while also allowing the acceptor
> thread block while the max has been reached
>
> Added:
> tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java (with
> props)
> tomcat/trunk/test/org/apache/tomcat/util/threads/
> tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
> (with props)
> Modified:
> tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java
>
> Modified: tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java?rev=1050161&r1=1050160&r2=1050161&view=diff
> ==============================================================================
> --- tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java
> (original)
> +++ tomcat/trunk/java/org/apache/coyote/AbstractProtocolHandler.java Thu Dec
> 16 21:09:49 2010
> @@ -122,6 +122,11 @@ public abstract class AbstractProtocolHa
> public void setMaxThreads(int maxThreads) {
> endpoint.setMaxThreads(maxThreads);
> }
> +
> + public int getMaxConnections() { return endpoint.getMaxConnections(); }
> + public void setMaxConnections(int maxConnections) {
> + endpoint.setMaxConnections(maxConnections);
> + }
>
>
> public int getMinSpareThreads() { return endpoint.getMinSpareThreads(); }
>
> Added: tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java?rev=1050161&view=auto
> ==============================================================================
> --- tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java (added)
> +++ tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java Thu
> Dec 16 21:09:49 2010
> @@ -0,0 +1,172 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.tomcat.util.threads;
> +
> +import java.util.Collection;
> +import java.util.concurrent.TimeUnit;
> +import java.util.concurrent.atomic.AtomicLong;
> +import java.util.concurrent.locks.AbstractQueuedSynchronizer;
> +/**
> + * Simple counter latch that allows code to keep an up and down counter, and
> waits while the latch holds a certain wait value.
> + * and threads using the latch to wait if the count has reached a certain
> value.
> + * The counter latch can be used to keep track of an atomic counter, since
> the operations {...@link #countDown()}
> + * and {...@link #countUp()} are atomic.
> + * When the latch reaches the wait value, threads will block. The counter
> latch can hence act like a
> + * count down latch or a count up latch, while letting you keep track of the
> counter as well.
> + * This counter latch works opposite as the
> java.util.concurrent.CountDownLatch, since the CounterLatch only blocks on a
> single value and releases the threads on all other values.
> + * @author fhanik
> + * @see <a
> href="http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html">CountDownLatch</a>
> + *
> + */
> +public class CounterLatch {
> +
> + private class Sync extends AbstractQueuedSynchronizer {
> + public Sync() {
> + }
> +
> + protected int tryAcquireShared(int arg) {
> + return ((!released) && count.get() == signal) ? -1 : 1;
> + }
> +
> + protected boolean tryReleaseShared(int arg) {
> + return true;
> + }
> + }
> +
> + private final Sync sync;
> + private final AtomicLong count;
> + private long signal;
This should be final to ensure safe publication across multiple threads.
> + private volatile boolean released = false;
> +
> + /**
> + * Instantiates a CounterLatch object with an initial value and a wait
> value.
> + * @param initial - initial value of the counter
> + * @param waitValue - when the counter holds this value,
> + * threads calling {...@link #await()} or {...@link #await(long,
> TimeUnit)}
> + * will wait until the counter changes value or until they are
> interrupted.
> + */
> + public CounterLatch(long initial, long waitValue) {
> + this.signal = waitValue;
> + this.count = new AtomicLong(initial);
> + this.sync = new Sync();
> + }
> +
> + /**
> + * Causes the calling thread to wait if the counter holds the waitValue.
> + * If the counter holds any other value, the thread will return
> + * If the thread is interrupted or becomes interrupted an
> InterruptedException is thrown
> + * @throws InterruptedException
> + */
> + public void await() throws InterruptedException {
> + sync.acquireSharedInterruptibly(1);
> + }
> +
> + /**
> + * Causes the calling thread to wait if the counter holds the waitValue.
> + * If the counter holds any other value, the thread will return
> + * If the thread is interrupted or becomes interrupted an
> InterruptedException is thrown
> + * @return true if the value changed, false if the timeout has elapsed
> + * @throws InterruptedException
> + */
> + public boolean await(long timeout, TimeUnit unit) throws
> InterruptedException {
> + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
> + }
> +
> + /**
> + * Increments the counter
> + * @return the previous counter value
> + */
> + public long countUp() {
> + long previous = count.getAndIncrement();
> + if (previous == signal) {
> + sync.releaseShared(0);
> + }
> + return previous;
> + }
> +
> + /**
> + * Decrements the counter
> + * @return the previous counter value
> + */
> + public long countDown() {
> + long previous = count.getAndDecrement();
> + if (previous == signal) {
> + sync.releaseShared(0);
> + }
> + return previous;
> + }
> +
> + /**
> + * Returns the current counter value
> + * @return the current counter value
> + */
> + public long getCount() {
> + return count.get();
> + }
> +
> + /**
> + * Performs an atomic update of the counter
> + * If the operation is successful and {...@code expect==waitValue &&
> expect!=update} waiting threads will be released.
> + * @param expect - the expected counter value
> + * @param update - the new counter value
> + * @return
> + */
> + public boolean compareAndSet(long expect, long update) {
> + boolean result = count.compareAndSet(expect, update);
> + if (result && expect==signal && expect != update) {
> + sync.releaseShared(0);
> + }
> + return result;
> + }
> +
> + /**
> + * returns true if there are threads blocked by this latch
> + * @return true if there are threads blocked by this latch
> + */
> + public boolean hasQueuedThreads() {
> + return sync.hasQueuedThreads();
> + }
> +
> + /**
> + * Returns a collection of the blocked threads
> + * @return a collection of the blocked threads
> + */
> + public Collection<Thread> getQueuedThreads() {
> + return sync.getQueuedThreads();
> + }
> +
> + /**
> + * releases all waiting threads. This operation is permanent, and no
> threads will block,
> + * even if the counter hits the {...@code waitValue} until {...@link
> #reset(long)} has been called.
> + * @return
> + */
> + public boolean releaseAll() {
> + released = true;
> + return sync.releaseShared(0);
> + }
> +
> + /**
> + * Resets the latch and initializes the counter with the new value.
> + * @param value the new counter value
> + * @see {...@link #releaseAll()}
> + */
> + public void reset(long value) {
> + this.count.set(value);
> + released = false;
> + }
> +
> +}
>
> Propchange: tomcat/trunk/java/org/apache/tomcat/util/threads/CounterLatch.java
> ------------------------------------------------------------------------------
> svn:eol-style = native
>
> Added: tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
> URL:
> http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java?rev=1050161&view=auto
> ==============================================================================
> --- tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
> (added)
> +++ tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
> Thu Dec 16 21:09:49 2010
> @@ -0,0 +1,124 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements. See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + * http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.tomcat.util.threads;
> +
> +import junit.framework.TestCase;
> +
> +public class TestCounterLatch extends TestCase {
> +
> + private volatile CounterLatch latch = null;
> +
> + public void setUp() {
> +
> + }
> +
> + public void tearDown() {
> + CounterLatch temp = latch;
> + if (temp!=null) temp.releaseAll();
> + latch = null;
> + }
> +
> + public void testNoThreads() throws Exception {
> + latch = new CounterLatch(0,0);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + }
> +
> + public void testOneThreadNoWait() throws Exception {
> + latch = new CounterLatch(0,1);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + Thread testThread = new Thread() {
> + public void run() {
> + try {
> + latch.await();
> + } catch (InterruptedException x) {
> + x.printStackTrace();
> + }
> + }
> + };
> + testThread.start();
> + Thread.sleep(50);
> + assertEquals("0 threads should be waiting", 0,
> latch.getQueuedThreads().size());
> + latch.countUp();
> + Thread.sleep(50);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + }
> +
> + public void testOneThreadWaitCountUp() throws Exception {
> + latch = new CounterLatch(0,1);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + Thread testThread = new Thread() {
> + public void run() {
> + try {
> + latch.await();
> + } catch (InterruptedException x) {
> + x.printStackTrace();
> + }
> + }
> + };
> + latch.countUp();
> + testThread.start();
> + Thread.sleep(50);
> + assertEquals("1 threads should be waiting", 1,
> latch.getQueuedThreads().size());
> + latch.countUp();
> + Thread.sleep(50);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + }
> +
> + public void testOneThreadWaitCountDown() throws Exception {
> + latch = new CounterLatch(1,0);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + Thread testThread = new Thread() {
> + public void run() {
> + try {
> + //System.out.println("Entering
> ["+Thread.currentThread().getName()+"]");
> + latch.await();
> + } catch (InterruptedException x) {
> + x.printStackTrace();
> + }
> + //System.out.println("Exiting
> ["+Thread.currentThread().getName()+"]");
> + }
> + };
> + latch.countDown();
> + testThread.start();
> + Thread.sleep(50);
> + assertEquals("1 threads should be waiting", 1,
> latch.getQueuedThreads().size());
> + latch.countDown();
> + Thread.sleep(50);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + }
> +
> + public void testOneRelease() throws Exception {
> + latch = new CounterLatch(1,0);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + Thread testThread = new Thread() {
> + public void run() {
> + try {
> + latch.await();
> + } catch (InterruptedException x) {
> + x.printStackTrace();
> + }
> + }
> + };
> + latch.countDown();
> + testThread.start();
> + Thread.sleep(50);
> + assertEquals("1 threads should be waiting", 1,
> latch.getQueuedThreads().size());
> + latch.releaseAll();
> + Thread.sleep(50);
> + assertEquals("No threads should be waiting", false,
> latch.hasQueuedThreads());
> + }
> +}
>
> Propchange:
> tomcat/trunk/test/org/apache/tomcat/util/threads/TestCounterLatch.java
> ------------------------------------------------------------------------------
> svn:eol-style = native
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]