Author: doogie
Date: Sun Apr 4 19:14:04 2010
New Revision: 930739
URL: http://svn.apache.org/viewvc?rev=930739&view=rev
Log:
Thread-pool baesd dependency resolution framework. This implementation
is also non-blocking.
Added:
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java
Modified:
ofbiz/trunk/framework/base/build.xml
ofbiz/trunk/framework/base/testdef/basetests.xml
Modified: ofbiz/trunk/framework/base/build.xml
URL:
http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/build.xml?rev=930739&r1=930738&r2=930739&view=diff
==============================================================================
--- ofbiz/trunk/framework/base/build.xml (original)
+++ ofbiz/trunk/framework/base/build.xml Sun Apr 4 19:14:04 2010
@@ -55,6 +55,7 @@ under the License.
<file name="org/ofbiz/base/util/test/UtilIOTests.java"/>
<file name="org/ofbiz/base/test/BaseUnitTests.java"/>
<file name="org/ofbiz/base/util/collections/test/GenericMapTest.java"/>
+ <file name="org/ofbiz/base/concurrent/test/DependencyPoolTests.java"/>
<file name="org/ofbiz/base/concurrent/test/SyncTTLObjectTest.java"/>
<file name="org/ofbiz/base/concurrent/test/AsyncTTLObjectTest.java"/>
<file name="org/ofbiz/base/concurrent/test/TTLCachedObjectTest.java"/>
Added:
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
URL:
http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java?rev=930739&view=auto
==============================================================================
---
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
(added)
+++
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
Sun Apr 4 19:14:04 2010
@@ -0,0 +1,139 @@
+package org.ofbiz.base.concurrent;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.ofbiz.base.lang.LockedBy;
+import org.ofbiz.base.lang.SourceMonitored;
+
+...@sourcemonitored
+public class DependencyPool<K, I extends DependencyPool.Item<I, K, V>, V> {
+ private final Executor executor;
+ private final ConcurrentMap<K, I> allItems = new ConcurrentHashMap<K, I>();
+ private final ConcurrentMap<K, Future<V>> results = new
ConcurrentHashMap<K, Future<V>>();
+ private final ReentrantLock submitLock = new ReentrantLock();
+ private final Condition submitCondition = submitLock.newCondition();
+ @LockedBy("submitLock")
+ private final Set<I> outstanding = new HashSet<I>();
+ @LockedBy("submitLock")
+ private final List<I> pending = new LinkedList<I>();
+
+ public DependencyPool(Executor executor) {
+ this.executor = executor;
+ }
+
+ public void add(I item) {
+ if (allItems.putIfAbsent(item.getKey(), item) == null) {
+ submitLock.lock();
+ try {
+ pending.add(item);
+ } finally {
+ submitLock.unlock();
+ }
+ }
+ }
+
+ public void addAll(Collection<I> items) {
+ for (I item: items) {
+ add(item);
+ }
+ }
+
+ public void start() {
+ submitLock.lock();
+ try {
+ submitWork();
+ } finally {
+ submitLock.unlock();
+ }
+ }
+
+ public V getResult(I item) throws InterruptedException, ExecutionException
{
+ Future<V> future = results.get(item.getKey());
+ if (future == null) {
+ return null;
+ } else {
+ return future.get();
+ }
+ }
+
+ public int getResultCount() {
+ return results.size();
+ }
+
+ public boolean await() throws InterruptedException {
+ submitLock.lock();
+ try {
+ submitWork();
+ while (!outstanding.isEmpty()) {
+ submitCondition.await();
+ }
+ return pending.isEmpty();
+ } finally {
+ submitLock.unlock();
+ }
+ }
+
+ @LockedBy("submitLock")
+ private int submitWork() {
+ Iterator<I> pendingIt = pending.iterator();
+ int submittedCount = 0;
+OUTER:
+ while (pendingIt.hasNext()) {
+ I item = pendingIt.next();
+ for (K dep: item.getDependencies()) {
+ if (!results.containsKey(dep)) {
+ continue OUTER;
+ }
+ }
+ submittedCount++;
+ pendingIt.remove();
+ outstanding.add(item);
+ executor.execute(new ItemTask(item));
+ }
+ return submittedCount;
+ }
+
+ private class ItemTask extends FutureTask<V> {
+ private final I item;
+
+ protected ItemTask(I item) {
+ super(item);
+ this.item = item;
+ }
+
+ protected void done() {
+ super.done();
+ results.put(item.getKey(), this);
+ submitLock.lock();
+ try {
+ outstanding.remove(item);
+ if (submitWork() == 0 && outstanding.isEmpty()) {
+ submitCondition.signal();
+ }
+ } finally {
+ submitLock.unlock();
+ }
+ }
+ }
+
+ public interface Item<I extends Item<I, K, V>, K, V> extends Callable<V> {
+ K getKey();
+ Collection<K> getDependencies();
+ }
+}
Added:
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java
URL:
http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java?rev=930739&view=auto
==============================================================================
---
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java
(added)
+++
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/test/DependencyPoolTests.java
Sun Apr 4 19:14:04 2010
@@ -0,0 +1,108 @@
+package org.ofbiz.base.concurrent.test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.ofbiz.base.concurrent.DependencyPool;
+import org.ofbiz.base.concurrent.ExecutionPool;
+import org.ofbiz.base.lang.SourceMonitored;
+import org.ofbiz.base.test.GenericTestCaseBase;
+import org.ofbiz.base.util.UtilMisc;
+
+...@sourcemonitored
+public class DependencyPoolTests extends GenericTestCaseBase {
+ public DependencyPoolTests(String name) {
+ super(name);
+ }
+
+ public void testDependencyPool() throws Exception {
+ // always use more threads than cpus, so that the single-cpu case can
be tested
+ ScheduledExecutorService executor =
ExecutionPool.getNewOptimalExecutor(getName());
+ DependencyPool pool = new DependencyPool(executor);
+ int itemSize = 100, depMax = 5, subMax = 3;
+ List<TestItem> items = new ArrayList<TestItem>(itemSize);
+ List<TestItem> previousItems = new ArrayList<TestItem>(itemSize);
+ for (int i = 0; i < itemSize; i++) {
+ int depSize = (int) (Math.random() * Math.min(depMax, itemSize - i
- 1));
+ List<Integer> deps = new ArrayList<Integer>(depSize);
+ for (int j = i + 1, k = 0; j < itemSize && k < depSize; j++) {
+ if (Math.random() * (itemSize - j) / (depSize - k + 1) < 1) {
+ deps.add(j);
+ k++;
+ }
+ }
+ int subSize = (int) (Math.random() * Math.min(subMax, i));
+ List<TestItem> subItems = new ArrayList<TestItem>(subSize);
+OUTER:
+ for (int j = 0; j < previousItems.size() && subItems.size() <
subSize;) {
+ if (Math.random() * j < 1) {
+ TestItem previousItem = previousItems.get(j);
+ for (int k = 0; k < deps.size(); k++) {
+ if
(previousItem.getDependencies().contains(deps.get(k))) {
+ j++;
+ continue OUTER;
+ }
+ }
+ subItems.add(previousItem);
+ previousItems.remove(j);
+ } else {
+ j++;
+ }
+ }
+ TestItem item = new TestItem(pool, Integer.valueOf(i),
Integer.toString(i), deps, subItems);
+ items.add(item);
+ previousItems.add(item);
+ }
+ pool.addAll(items);
+ pool.start();
+ pool.await();
+ assertEquals("result count", itemSize, pool.getResultCount());
+ for (int i = 0; i < itemSize; i++) {
+ TestItem item = items.get(i);
+ assertEquals("item(" + i + ") result", Integer.toString(i),
pool.getResult(item));
+ }
+ executor.shutdown();
+ }
+
+ private static class TestItem implements DependencyPool.Item<TestItem,
Integer, String> {
+ private final DependencyPool pool;
+ private final Integer key;
+ private final String result;
+ private final Collection<Integer> dependencies;
+ private final Collection<TestItem> subItems;
+
+ protected TestItem(DependencyPool pool, Integer key, String result,
Collection<Integer> dependencies, Collection<TestItem> subItems) {
+ this.pool = pool;
+ this.key = key;
+ this.result = result;
+ this.dependencies = dependencies;
+ this.subItems = subItems;
+ }
+
+ public Integer getKey() {
+ return key;
+ }
+
+ public Collection<Integer> getDependencies() {
+ return dependencies;
+ }
+
+ public Collection<TestItem> getSubItems() {
+ return subItems;
+ }
+
+ public String call() throws Exception {
+ int sleepTime = (int) (Math.random() * 100);
+ Thread.sleep(sleepTime);
+ if (!subItems.isEmpty()) {
+ pool.addAll(subItems);
+ }
+ return result;
+ }
+ }
+}
Modified: ofbiz/trunk/framework/base/testdef/basetests.xml
URL:
http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/testdef/basetests.xml?rev=930739&r1=930738&r2=930739&view=diff
==============================================================================
--- ofbiz/trunk/framework/base/testdef/basetests.xml (original)
+++ ofbiz/trunk/framework/base/testdef/basetests.xml Sun Apr 4 19:14:04 2010
@@ -31,6 +31,7 @@
<junit-test-suite
class-name="org.ofbiz.base.util.cache.test.UtilCacheTests"/>
<junit-test-suite
class-name="org.ofbiz.base.conversion.test.DateTimeTests.java"/>
<junit-test-suite
class-name="org.ofbiz.base.conversion.test.MiscTests.java"/>
+ <junit-test-suite
class-name="org.ofbiz.base.concurrent.test.DepedencyPoolTests"/>
<junit-test-suite class-name="org.ofbiz.base.util.test.UtilIOTests"/>
<junit-test-suite class-name="org.ofbiz.base.test.BaseUnitTests"/>
</test-case>