Author: dblevins
Date: Thu Feb 18 07:43:32 2010
New Revision: 911271
URL: http://svn.apache.org/viewvc?rev=911271&view=rev
Log:
Potential replacement for Stateless pooling code. Seems to have an issue when
plugged in.
Added:
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
(with props)
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
(with props)
Added:
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java?rev=911271&view=auto
==============================================================================
---
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
(added)
+++
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
Thu Feb 18 07:43:32 2010
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.util;
+
+import java.lang.ref.SoftReference;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Any successful pop() call requires a corresponding push() or discard() call.
+ * <p/>
+ * A pop() call that returns null is considered successful. A null indicates
+ * that the calling code has a permit to create a poolable object and call
+ * {...@link Pool#push(Object)}. This is the only situation in which that
method
+ * may be called.
+ * <p/>
+ * To simply fill the pool without a corresponding pop(), the add() method
+ * must be used. This method will attempt to aquire a permit to add to the
pool.
+ *
+ * @version $Rev$ $Date$
+ */
+public class Pool<T> {
+
+ private final LinkedList<Entry> pool = new LinkedList<Entry>();
+ private final Semaphore maxPolicy;
+ private final Semaphore minPolicy;
+ private final int max;
+
+ public Pool(int max, int min, boolean strict) {
+ this.max = max;
+ this.minPolicy = new Semaphore(min);
+ if (strict) {
+ this.maxPolicy = new Semaphore(max);
+ } else {
+ this.maxPolicy = null;
+ }
+ }
+
+ /**
+ * Any successful pop() call requires a corresponding push() or discard()
call
+ * <p/>
+ * A pop() call that returns null is considered successful.
+ *
+ * @param timeout
+ * @param unit
+ * @return
+ * @throws InterruptedException
+ * @throws IllegalStateException if a permit could not be acquired
+ */
+ public Entry<T> pop(long timeout, TimeUnit unit) throws
InterruptedException, TimeoutException {
+ if (maxPolicy != null) {
+ if (!maxPolicy.tryAcquire(timeout, unit)) throw new
TimeoutException("Waited " + timeout + " " + unit);
+ }
+
+ Entry<T> entry = null;
+ while (entry == null) {
+
+ synchronized (pool) {
+ try {
+ entry = pool.removeFirst();
+ } catch (NoSuchElementException e) {
+ return null;
+ }
+ }
+
+ final T obj = entry.soft.get();
+
+ if (obj != null) {
+
+ final boolean notBusy = entry.active.compareAndSet(null, obj);
+
+ if (notBusy) return entry;
+ }
+
+ entry = null;
+ }
+
+ return null;
+ }
+
+ /**
+ * Attempt to aquire a permit to add the object to the pool.
+ *
+ * @param obj
+ * @return true of the item as added
+ */
+ public boolean add(T obj) {
+ return (maxPolicy == null || maxPolicy.tryAcquire()) && push(obj);
+ }
+
+ /**
+ * Never call this method without having successfully called
+ * {...@link #pop(long, java.util.concurrent.TimeUnit)} beforehand.
+ * <p/>
+ * Failure to do so will increase the max pool size by one.
+ *
+ * @param obj
+ * @return
+ */
+ public boolean push(T obj) {
+ return push(new Entry<T>(obj));
+ }
+
+ /**
+ * Never call this method without having successfully called
+ * {...@link #pop(long, java.util.concurrent.TimeUnit)} beforehand.
+ * <p/>
+ * Failure to do so will increase the max pool size by one.
+ *
+ * @param entry
+ * @return true of the item as added
+ */
+ public boolean push(Entry<T> entry) {
+
+ boolean added = false;
+
+ if (entry != null) {
+ final T obj = entry.active.getAndSet(null);
+
+ if (entry.hard.get() == null && minPolicy.tryAcquire()) {
+ entry.hard.set(obj);
+ synchronized (pool) {
+ if (pool.size() < max) {
+ pool.addFirst(entry);
+ added = true;
+ }
+ }
+ if (!added) {
+ minPolicy.release();
+ }
+ } else {
+ synchronized (pool) {
+ if (pool.size() < max) {
+ pool.addLast(entry);
+ added = true;
+ }
+ }
+ }
+ }
+
+ if (maxPolicy != null) maxPolicy.release();
+
+ return added;
+ }
+
+ /**
+ * Used when a call to pop() was made that returned null
+ * indicating that the caller has a permit to create an
+ * object for this pool, but the caller will not be exercising
+ * that permit and wishes intstead to return "null" to the pool.
+ */
+ public void discard() {
+ discard(null);
+ }
+
+ public void discard(Entry<T> entry) {
+ if (entry != null) {
+ final T obj = entry.active.getAndSet(null);
+
+ if (entry.hard.compareAndSet(obj, null)) {
+ minPolicy.release();
+ }
+ }
+
+ if (maxPolicy != null) maxPolicy.release();
+ }
+
+ public static class Entry<T> {
+ private final SoftReference<T> soft;
+ private final AtomicReference<T> hard = new AtomicReference<T>();
+
+ // Added this so the soft reference isn't collected
+ // after the Entry instance is returned from a "pop" method
+ // Also acts as an "inUse" boolean
+ private final AtomicReference<T> active = new AtomicReference<T>();
+
+ /**
+ * Constructor is private so that it is impossible for an Entry object
+ * to exist without there being a corresponding permit issued for the
+ * object wrapped by this Entry.
+ * <p/>
+ * This helps ensure that when an Entry is returned to the pool it is
+ * always safe to call {...@link Semaphore#release()} which increases
the
+ * permit size by one.
+ *
+ * @param obj
+ */
+ private Entry(T obj) {
+ this.soft = new SoftReference<T>(obj);
+ this.active.set(obj);
+ }
+
+ public T get() {
+ return active.get();
+ }
+
+ /**
+ * Largely for testing purposes
+ *
+ * @return
+ */
+ public boolean hasHardReference() {
+ return hard.get() != null;
+ }
+ }
+}
Propchange:
openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/Pool.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
URL:
http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java?rev=911271&view=auto
==============================================================================
---
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
(added)
+++
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
Thu Feb 18 07:43:32 2010
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.util;
+
+import junit.framework.TestCase;
+
+import javax.ejb.Remote;
+import javax.ejb.Stateless;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class PoolTest extends TestCase {
+
+ public void testStrictBasics() throws Exception {
+ exerciseStrictPool(1, 0);
+ exerciseStrictPool(3, 0);
+ exerciseStrictPool(4, 2);
+ exerciseStrictPool(5, 5);
+ }
+
+ public void testEmptyPool() throws Exception {
+ final int max = 4;
+ final int min = 2;
+ final Pool<String> pool = new Pool<String>(max, min, true);
+
+ final List<Pool.Entry<String>> entries = drain(pool);
+
+ // Should have received "max" number of nulls
+ checkMax(max, entries);
+
+ // All entries should be null
+ for (Pool.Entry<String> entry : entries) {
+ assertNull(entry);
+ }
+
+ // Pool is drained and no permits are available
+ // Test that an add does not work
+ assertFalse(pool.add("extra"));
+
+ // Fill the pool via push
+ for (int i = 0; i < max; i++) {
+ pool.push("a" + System.currentTimeMillis());
+ }
+
+ // Drain, check, then discard all
+ drainCheckPush(max, min, pool);
+ drainCheckPush(max, min, pool);
+ drainCheckPush(max, min, pool);
+
+ discard(pool, drain(pool));
+
+
+ // Fill the pool one item at a time, check it's integrity
+ for (int i = 1; i <= max; i++) {
+ assertTrue("i=" + i + ", max=" + max, pool.add("a" + i));
+
+ List<Pool.Entry<String>> list = drain(pool);
+ checkMax(max, list);
+ checkMin(Math.min(i, min), list);
+ checkEntries(i, list);
+ push(pool, list);
+ }
+
+ }
+
+ private void drainCheckPush(int max, int min, Pool<String> pool) throws
InterruptedException {
+ final List<Pool.Entry<String>> list = drain(pool);
+ checkMax(max, list);
+ checkMin(min, list);
+ push(pool, list);
+ }
+
+ private void discard(Pool<String> pool, List<Pool.Entry<String>> list) {
+ for (Pool.Entry<String> entry : list) {
+ pool.discard(entry);
+ }
+ }
+
+ private void push(Pool<String> pool, List<Pool.Entry<String>> list) {
+ for (Pool.Entry<String> entry : list) {
+ pool.push(entry);
+ }
+ }
+
+ private void exerciseStrictPool(int max, int min) throws
InterruptedException {
+ Pool<String> pool = new Pool<String>(max, min, true);
+
+ // Fill the pool
+ for (int i = 0; i < max; i++) {
+ assertTrue(pool.add("a" + i));
+ }
+
+ // Add one past the max
+ assertFalse(pool.add("extra"));
+
+ // Check the contents of the pool
+ final List<Pool.Entry<String>> entries = drain(pool);
+
+ checkMax(max, entries);
+ checkMin(min, entries);
+
+ // Push one back and check pool
+ pool.push(entries.remove(0));
+ final List<Pool.Entry<String>> entries2 = drain(pool);
+ assertEquals(max, entries2.size() + entries.size());
+
+
+ // discard all instances and add new ones
+ entries.addAll(entries2);
+ entries2.clear();
+ final Iterator<Pool.Entry<String>> iterator = entries.iterator();
+ while (iterator.hasNext()) {
+
+ // Attempt two discards, followed by two adds
+
+ pool.discard(iterator.next());
+ if (iterator.hasNext()) {
+ pool.discard(iterator.next());
+ pool.add("s" + System.currentTimeMillis());
+ }
+ pool.add("s" + System.currentTimeMillis());
+ }
+
+ // Once again check min and max
+ final List<Pool.Entry<String>> list = drain(pool);
+ checkMax(max, list);
+ checkMin(min, list);
+ }
+
+ public void testStrictMultiThreaded() throws Exception {
+
+ final int threadCount = 200;
+
+ final Pool pool = new Pool(10, 5, true);
+ final CountDownLatch startPistol = new CountDownLatch(1);
+ final CountDownLatch startingLine = new CountDownLatch(10);
+ final CountDownLatch finishingLine = new CountDownLatch(threadCount);
+
+ // Do a business method...
+ Runnable r = new Runnable(){
+ public void run(){
+ startingLine.countDown();
+ try {
+ startPistol.await();
+
+ Pool.Entry entry = pool.pop(1000, MILLISECONDS);
+ Thread.sleep(50);
+ if (entry == null) {
+ pool.push(new CounterBean());
+ } else {
+ pool.push(entry);
+ }
+ } catch (TimeoutException e) {
+ // Simple timeout while waiting on pop()
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+ finishingLine.countDown();
+ }
+ };
+
+ // -- READY --
+
+ // How much ever the no of client invocations the count should be 10
as only 10 instances will be created.
+ for (int i = 0; i < threadCount; i++) {
+ Thread t = new Thread(r);
+ t.start();
+ }
+
+ // Wait for the beans to reach the finish line
+ startingLine.await(1000, TimeUnit.MILLISECONDS);
+
+ // -- SET --
+
+ assertEquals(0, CounterBean.instances.get());
+
+ // -- GO --
+
+ startPistol.countDown(); // go
+
+ assertTrue(finishingLine.await(5000, TimeUnit.MILLISECONDS));
+
+ // -- DONE --
+
+ assertEquals(10, CounterBean.instances.get());
+
+
+ }
+
+ private void checkMax(int max, List<Pool.Entry<String>> entries) {
+ assertEquals(max, entries.size());
+ }
+
+ private void checkMin(int min, List<Pool.Entry<String>> entries) {
+ int actualMin = 0;
+ for (Pool.Entry<String> entry : entries) {
+ if (entry != null && entry.hasHardReference()) actualMin++;
+ }
+
+ assertEquals(min, actualMin);
+ }
+
+ private void checkEntries(int expected, List<Pool.Entry<String>> entries) {
+ int found = 0;
+ for (Pool.Entry<String> entry : entries) {
+ if (entry == null) continue;
+ found++;
+ assertNotNull(entry.get());
+ }
+
+ assertEquals(expected, found);
+ }
+
+ private <T> List<Pool.Entry<T>> drain(Pool<T> pool) throws
InterruptedException {
+ List<Pool.Entry<T>> entries = new ArrayList<Pool.Entry<T>>();
+ try {
+ while (true) {
+ entries.add(pool.pop(0, MILLISECONDS));
+ }
+ } catch (TimeoutException e) {
+ // pool drained
+ }
+ return entries;
+ }
+
+ public static class CounterBean {
+
+ public static AtomicInteger instances = new AtomicInteger();
+
+ private int count;
+
+ public CounterBean() {
+ count = instances.incrementAndGet();
+ }
+
+ public int count() {
+ return instances.get();
+ }
+
+ }
+
+}
Propchange:
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/util/PoolTest.java
------------------------------------------------------------------------------
svn:eol-style = native