Author: dblevins
Date: Thu Feb 18 07:43:32 2010
New Revision: 911271

URL: http://svn.apache.org/viewvc?rev=911271&view=rev
Log:
Potential replacement for Stateless pooling code.  Seems to have an issue when 
plugged in.

Added:
    
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
   (with props)
    
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
   (with props)

Added: 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java?rev=911271&view=auto
==============================================================================
--- 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
 (added)
+++ 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
 Thu Feb 18 07:43:32 2010
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.util;
+
+import java.lang.ref.SoftReference;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Any successful pop() call requires a corresponding push() or discard() call.
+ * <p/>
+ * A pop() call that returns null is considered successful.  A null indicates
+ * that the calling code has a permit to create a poolable object and call
+ * {...@link Pool#push(Object)}.  This is the only situation in which that 
method
+ * may be called.
+ * <p/>
+ * To simply fill the pool without a corresponding pop(), the add() method
+ * must be used.  This method will attempt to aquire a permit to add to the 
pool.
+ *
+ * @version $Rev$ $Date$
+ */
+public class Pool<T> {
+
+    private final LinkedList<Entry> pool = new LinkedList<Entry>();
+    private final Semaphore maxPolicy;
+    private final Semaphore minPolicy;
+    private final int max;
+
+    public Pool(int max, int min, boolean strict) {
+        this.max = max;
+        this.minPolicy = new Semaphore(min);
+        if (strict) {
+            this.maxPolicy = new Semaphore(max);
+        } else {
+            this.maxPolicy = null;
+        }
+    }
+
+    /**
+     * Any successful pop() call requires a corresponding push() or discard() 
call
+     * <p/>
+     * A pop() call that returns null is considered successful.
+     *
+     * @param timeout
+     * @param unit
+     * @return
+     * @throws InterruptedException
+     * @throws IllegalStateException if a permit could not be acquired
+     */
+    public Entry<T> pop(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException {
+        if (maxPolicy != null) {
+            if (!maxPolicy.tryAcquire(timeout, unit)) throw new 
TimeoutException("Waited " + timeout + " " + unit);
+        }
+
+        Entry<T> entry = null;
+        while (entry == null) {
+
+            synchronized (pool) {
+                try {
+                    entry = pool.removeFirst();
+                } catch (NoSuchElementException e) {
+                    return null;
+                }
+            }
+
+            final T obj = entry.soft.get();
+
+            if (obj != null) {
+
+                final boolean notBusy = entry.active.compareAndSet(null, obj);
+
+                if (notBusy) return entry;
+            }
+
+            entry = null;
+        }
+
+        return null;
+    }
+
+    /**
+     * Attempt to aquire a permit to add the object to the pool.
+     *
+     * @param obj
+     * @return true of the item as added
+     */
+    public boolean add(T obj) {
+        return (maxPolicy == null || maxPolicy.tryAcquire()) && push(obj);
+    }
+
+    /**
+     * Never call this method without having successfully called
+     * {...@link #pop(long, java.util.concurrent.TimeUnit)} beforehand.
+     * <p/>
+     * Failure to do so will increase the max pool size by one.
+     *
+     * @param obj
+     * @return
+     */
+    public boolean push(T obj) {
+        return push(new Entry<T>(obj));
+    }
+
+    /**
+     * Never call this method without having successfully called
+     * {...@link #pop(long, java.util.concurrent.TimeUnit)} beforehand.
+     * <p/>
+     * Failure to do so will increase the max pool size by one.
+     *
+     * @param entry
+     * @return true of the item as added
+     */
+    public boolean push(Entry<T> entry) {
+
+        boolean added = false;
+
+        if (entry != null) {
+            final T obj = entry.active.getAndSet(null);
+
+            if (entry.hard.get() == null && minPolicy.tryAcquire()) {
+                entry.hard.set(obj);
+                synchronized (pool) {
+                    if (pool.size() < max) {
+                        pool.addFirst(entry);
+                        added = true;
+                    }
+                }
+                if (!added) {
+                    minPolicy.release();
+                }
+            } else {
+                synchronized (pool) {
+                    if (pool.size() < max) {
+                        pool.addLast(entry);
+                        added = true;
+                    }
+                }
+            }
+        }
+
+        if (maxPolicy != null) maxPolicy.release();
+
+        return added;
+    }
+
+    /**
+     * Used when a call to pop() was made that returned null
+     * indicating that the caller has a permit to create an
+     * object for this pool, but the caller will not be exercising
+     * that permit and wishes intstead to return "null" to the pool.
+     */
+    public void discard() {
+        discard(null);
+    }
+
+    public void discard(Entry<T> entry) {
+        if (entry != null) {
+            final T obj = entry.active.getAndSet(null);
+
+            if (entry.hard.compareAndSet(obj, null)) {
+                minPolicy.release();
+            }
+        }
+
+        if (maxPolicy != null) maxPolicy.release();
+    }
+
+    public static class Entry<T> {
+        private final SoftReference<T> soft;
+        private final AtomicReference<T> hard = new AtomicReference<T>();
+
+        // Added this so the soft reference isn't collected
+        // after the Entry instance is returned from a "pop" method
+        // Also acts as an "inUse" boolean
+        private final AtomicReference<T> active = new AtomicReference<T>();
+
+        /**
+         * Constructor is private so that it is impossible for an Entry object
+         * to exist without there being a corresponding permit issued for the
+         * object wrapped by this Entry.
+         * <p/>
+         * This helps ensure that when an Entry is returned to the pool it is
+         * always safe to call {...@link Semaphore#release()} which increases 
the
+         * permit size by one.
+         *
+         * @param obj
+         */
+        private Entry(T obj) {
+            this.soft = new SoftReference<T>(obj);
+            this.active.set(obj);
+        }
+
+        public T get() {
+            return active.get();
+        }
+
+        /**
+         * Largely for testing purposes
+         *
+         * @return
+         */
+        public boolean hasHardReference() {
+            return hard.get() != null;
+        }
+    }
+}

Propchange: 
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
URL: 
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java?rev=911271&view=auto
==============================================================================
--- 
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
 (added)
+++ 
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
 Thu Feb 18 07:43:32 2010
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.util;
+
+import junit.framework.TestCase;
+
+import javax.ejb.Remote;
+import javax.ejb.Stateless;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class PoolTest extends TestCase {
+
+    public void testStrictBasics() throws Exception {
+        exerciseStrictPool(1, 0);
+        exerciseStrictPool(3, 0);
+        exerciseStrictPool(4, 2);
+        exerciseStrictPool(5, 5);
+    }
+
+    public void testEmptyPool() throws Exception {
+        final int max = 4;
+        final int min = 2;
+        final Pool<String> pool = new Pool<String>(max, min, true);
+
+        final List<Pool.Entry<String>> entries = drain(pool);
+
+        // Should have received "max" number of nulls
+        checkMax(max, entries);
+
+        // All entries should be null
+        for (Pool.Entry<String> entry : entries) {
+            assertNull(entry);
+        }
+
+        // Pool is drained and no permits are available
+        // Test that an add does not work
+        assertFalse(pool.add("extra"));
+
+        // Fill the pool via push
+        for (int i = 0; i < max; i++) {
+            pool.push("a" + System.currentTimeMillis());
+        }
+
+        // Drain, check, then discard all
+        drainCheckPush(max, min, pool);
+        drainCheckPush(max, min, pool);
+        drainCheckPush(max, min, pool);
+
+        discard(pool, drain(pool));
+
+
+        // Fill the pool one item at a time, check it's integrity
+        for (int i = 1; i <= max; i++) {
+            assertTrue("i=" + i + ", max=" + max, pool.add("a" + i));
+
+            List<Pool.Entry<String>> list = drain(pool);
+            checkMax(max, list);
+            checkMin(Math.min(i, min), list);
+            checkEntries(i, list);
+            push(pool, list);
+        }
+
+    }
+
+    private void drainCheckPush(int max, int min, Pool<String> pool) throws 
InterruptedException {
+        final List<Pool.Entry<String>> list = drain(pool);
+        checkMax(max, list);
+        checkMin(min, list);
+        push(pool, list);
+    }
+
+    private void discard(Pool<String> pool, List<Pool.Entry<String>> list) {
+        for (Pool.Entry<String> entry : list) {
+            pool.discard(entry);
+        }
+    }
+
+    private void push(Pool<String> pool, List<Pool.Entry<String>> list) {
+        for (Pool.Entry<String> entry : list) {
+            pool.push(entry);
+        }
+    }
+
+    private void exerciseStrictPool(int max, int min) throws 
InterruptedException {
+        Pool<String> pool = new Pool<String>(max, min, true);
+
+        // Fill the pool
+        for (int i = 0; i < max; i++) {
+            assertTrue(pool.add("a" + i));
+        }
+
+        // Add one past the max
+        assertFalse(pool.add("extra"));
+
+        // Check the contents of the pool
+        final List<Pool.Entry<String>> entries = drain(pool);
+
+        checkMax(max, entries);
+        checkMin(min, entries);
+
+        // Push one back and check pool
+        pool.push(entries.remove(0));
+        final List<Pool.Entry<String>> entries2 = drain(pool);
+        assertEquals(max, entries2.size() + entries.size());
+
+
+        // discard all instances and add new ones
+        entries.addAll(entries2);
+        entries2.clear();
+        final Iterator<Pool.Entry<String>> iterator = entries.iterator();
+        while (iterator.hasNext()) {
+
+            // Attempt two discards, followed by two adds
+
+            pool.discard(iterator.next());
+            if (iterator.hasNext()) {
+                pool.discard(iterator.next());
+                pool.add("s" + System.currentTimeMillis());
+            }
+            pool.add("s" + System.currentTimeMillis());
+        }
+
+        // Once again check min and max
+        final List<Pool.Entry<String>> list = drain(pool);
+        checkMax(max, list);
+        checkMin(min, list);
+    }
+
+    public void testStrictMultiThreaded() throws Exception {
+
+        final int threadCount = 200;
+
+        final Pool pool = new Pool(10, 5, true);
+        final CountDownLatch startPistol = new CountDownLatch(1);
+        final CountDownLatch startingLine = new CountDownLatch(10);
+        final CountDownLatch finishingLine = new CountDownLatch(threadCount);
+
+        // Do a business method...
+        Runnable r = new Runnable(){
+               public void run(){
+                startingLine.countDown();
+                try {
+                    startPistol.await();
+
+                    Pool.Entry entry = pool.pop(1000, MILLISECONDS);
+                    Thread.sleep(50);
+                    if (entry == null) {
+                        pool.push(new CounterBean());
+                    } else {
+                        pool.push(entry);
+                    }
+                } catch (TimeoutException e) {
+                    // Simple timeout while waiting on pop()
+                } catch (InterruptedException e) {
+                    Thread.interrupted();
+                }
+                finishingLine.countDown();
+            }
+        };
+
+        //  -- READY --
+
+        // How much ever the no of client invocations the count should be 10 
as only 10 instances will be created.
+        for (int i = 0; i < threadCount; i++) {
+            Thread t = new Thread(r);
+            t.start();
+        }
+
+        // Wait for the beans to reach the finish line
+        startingLine.await(1000, TimeUnit.MILLISECONDS);
+
+        //  -- SET --
+
+        assertEquals(0, CounterBean.instances.get());
+
+        //  -- GO --
+
+        startPistol.countDown(); // go
+
+        assertTrue(finishingLine.await(5000, TimeUnit.MILLISECONDS));
+
+        //  -- DONE --
+
+        assertEquals(10, CounterBean.instances.get());
+
+
+    }
+
+    private void checkMax(int max, List<Pool.Entry<String>> entries) {
+        assertEquals(max, entries.size());
+    }
+
+    private void checkMin(int min, List<Pool.Entry<String>> entries) {
+        int actualMin = 0;
+        for (Pool.Entry<String> entry : entries) {
+            if (entry != null && entry.hasHardReference()) actualMin++;
+        }
+
+        assertEquals(min, actualMin);
+    }
+
+    private void checkEntries(int expected, List<Pool.Entry<String>> entries) {
+        int found = 0;
+        for (Pool.Entry<String> entry : entries) {
+            if (entry == null) continue;
+            found++;
+            assertNotNull(entry.get());
+        }
+
+        assertEquals(expected, found);
+    }
+
+    private <T> List<Pool.Entry<T>> drain(Pool<T> pool) throws 
InterruptedException {
+        List<Pool.Entry<T>> entries = new ArrayList<Pool.Entry<T>>();
+        try {
+            while (true) {
+                entries.add(pool.pop(0, MILLISECONDS));
+            }
+        } catch (TimeoutException e) {
+            // pool drained
+        }
+        return entries;
+    }
+
+    public static class CounterBean {
+
+        public static AtomicInteger instances = new AtomicInteger();
+
+        private int count;
+
+        public CounterBean() {
+            count = instances.incrementAndGet();
+        }
+
+        public int count() {
+            return instances.get();
+        }
+
+    }
+
+}

Propchange: 
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to