Author: doogie
Date: Tue Jun 1 21:46:32 2010
New Revision: 950256
URL: http://svn.apache.org/viewvc?rev=950256&view=rev
Log:
Start making use of DependencyPool.
Modified:
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/DatabaseUtil.java
Modified:
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
URL:
http://svn.apache.org/viewvc/ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java?rev=950256&r1=950255&r2=950256&view=diff
==============================================================================
---
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
(original)
+++
ofbiz/trunk/framework/base/src/org/ofbiz/base/concurrent/DependencyPool.java
Tue Jun 1 21:46:32 2010
@@ -21,9 +21,12 @@ import java.util.concurrent.locks.Reentr
import org.ofbiz.base.lang.LockedBy;
import org.ofbiz.base.lang.SourceMonitored;
+import org.ofbiz.base.util.Debug;
@SourceMonitored
public class DependencyPool<K, I extends DependencyPool.Item<I, K, V>, V> {
+ public static final String module = DependencyPool.class.getName();
+
private final Executor executor;
private final ConcurrentMap<K, I> allItems = new ConcurrentHashMap<K, I>();
private final ConcurrentMap<K, Future<V>> results = new
ConcurrentHashMap<K, Future<V>>();
@@ -84,10 +87,14 @@ public class DependencyPool<K, I extends
public boolean await() throws InterruptedException {
submitLock.lock();
try {
+ Debug.logInfo("a outstanding.size=" + outstanding.size(), module);
+ Debug.logInfo("a pending.size=" + pending.size(), module);
submitWork();
while (!outstanding.isEmpty()) {
submitCondition.await();
}
+ Debug.logInfo("b outstanding.size=" + outstanding.size(), module);
+ Debug.logInfo("b pending.size=" + pending.size(), module);
return pending.isEmpty();
} finally {
submitLock.unlock();
Modified:
ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/DatabaseUtil.java
URL:
http://svn.apache.org/viewvc/ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/DatabaseUtil.java?rev=950256&r1=950255&r2=950256&view=diff
==============================================================================
--- ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/DatabaseUtil.java
(original)
+++ ofbiz/trunk/framework/entity/src/org/ofbiz/entity/jdbc/DatabaseUtil.java
Tue Jun 1 21:46:32 2010
@@ -47,6 +47,7 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.ofbiz.base.util.Debug;
+import org.ofbiz.base.util.UtilMisc;
import org.ofbiz.base.util.UtilTimer;
import org.ofbiz.base.util.UtilValidate;
import org.ofbiz.base.util.UtilXml;
@@ -63,6 +64,7 @@ import org.ofbiz.entity.model.ModelKeyMa
import org.ofbiz.entity.model.ModelRelation;
import org.ofbiz.entity.model.ModelViewEntity;
import org.ofbiz.entity.transaction.TransactionUtil;
+import org.ofbiz.base.concurrent.DependencyPool;
import org.ofbiz.base.concurrent.ExecutionPool;
/**
@@ -184,7 +186,7 @@ public class DatabaseUtil {
return result;
}
- private class TableFuture extends DBFuture<TableFuture> {
+ private class TableFuture extends DBFuture<TableFuture> implements
DependencyPool.Item<TableFuture, String, TableFuture> {
private final int curEnt;
private final int totalEnt;
private final ModelEntity entity;
@@ -202,6 +204,18 @@ public class DatabaseUtil {
this.addMissing = addMissing;
}
+ public String getKey() {
+ return "table#" + entity.getTableName(datasourceInfo);
+ }
+
+ public Collection<String> getDependencies() {
+ return Collections.emptyList();
+ }
+
+ public Collection<String> getProvides() {
+ return Collections.emptyList();
+ }
+
public TableFuture call() {
String entMessage = "(" + timer.timeSinceLast() + "ms) Checking #"
+ curEnt + "/" + totalEnt +
" Entity " + entity.getEntityName() + " with table " +
entity.getTableName(datasourceInfo);
@@ -402,7 +416,7 @@ public class DatabaseUtil {
}
}
- private class ForeignKeyIndexFuture extends
CountingFuture<ForeignKeyIndexFuture> {
+ private class ForeignKeyIndexFuture extends
CountingFuture<ForeignKeyIndexFuture> implements
DependencyPool.Item<ForeignKeyIndexFuture, String, ForeignKeyIndexFuture> {
private final ModelEntity entity;
private final int constraintNameClipLength;
@@ -412,13 +426,25 @@ public class DatabaseUtil {
this.constraintNameClipLength = constraintNameClipLength;
}
+ public String getKey() {
+ return "fk-index#" + entity.getTableName(datasourceInfo);
+ }
+
+ public Collection<String> getDependencies() {
+ return UtilMisc.toList("table#" +
entity.getTableName(datasourceInfo));
+ }
+
+ public Collection<String> getProvides() {
+ return Collections.emptyList();
+ }
+
public ForeignKeyIndexFuture call() {
count.addAndGet(createForeignKeyIndices(entity,
constraintNameClipLength, messages));
return this;
}
}
- private class DeclaredIndexFuture extends
CountingFuture<DeclaredIndexFuture> {
+ private class DeclaredIndexFuture extends
CountingFuture<DeclaredIndexFuture> implements
DependencyPool.Item<DeclaredIndexFuture, String, DeclaredIndexFuture> {
private final ModelEntity entity;
protected DeclaredIndexFuture(AtomicInteger count, ModelEntity entity)
{
@@ -426,6 +452,19 @@ public class DatabaseUtil {
this.entity = entity;
}
+ public String getKey() {
+ return "declared-index#" + entity.getTableName(datasourceInfo);
+ }
+
+ public Collection<String> getDependencies() {
+ String tableName = entity.getTableName(datasourceInfo);
+ return UtilMisc.toList("table#" + tableName, "fk-index#" +
tableName);
+ }
+
+ public Collection<String> getProvides() {
+ return Collections.emptyList();
+ }
+
public DeclaredIndexFuture call() {
count.addAndGet(createDeclaredIndices(entity, messages));
return this;
@@ -477,9 +516,11 @@ public class DatabaseUtil {
int curEnt = 0;
int totalEnt = modelEntityList.size();
List<ModelEntity> entitiesAdded = FastList.newInstance();
- List<Future<TableFuture>> tableFutureFutures = FastList.newInstance();
+ List<TableFuture> tableFutures = FastList.newInstance();
ScheduledExecutorService threadPool =
ExecutionPool.getNewOptimalExecutor("Databaseutil");
+ DependencyPool depPool = new DependencyPool(threadPool, 3);
+ Debug.logInfo("start of table creation", module);
for (ModelEntity entity: modelEntityList) {
curEnt++;
@@ -491,13 +532,44 @@ public class DatabaseUtil {
continue;
}
- tableFutureFutures.add(threadPool.submit(new
TableFuture(modelEntities, timer, tableNames, colInfo, curEnt, totalEnt,
entity, checkPks, addMissing)));
+ TableFuture tf = new TableFuture(modelEntities, timer, tableNames,
colInfo, curEnt, totalEnt, entity, checkPks, addMissing);
+ tableFutures.add(tf);
+ depPool.add(tf);
}
- for (TableFuture tableFuture: getAllFutures(tableFutureFutures)) {
- tableFuture.updateData(messages, entitiesAdded);
+ // for each newly added table, add fk indices
+ Debug.logInfo("start of fk indices", module);
+ List<ForeignKeyIndexFuture> fkIndicesFutures = FastList.newInstance();
+ AtomicInteger totalFkIndices = new AtomicInteger();
+ if (datasourceInfo.useFkIndices) {
+ for (ModelEntity curEntity: entitiesAdded) {
+ if (curEntity.getRelationsOneSize() > 0) {
+ ForeignKeyIndexFuture fkif = new
ForeignKeyIndexFuture(totalFkIndices, curEntity,
datasourceInfo.constraintNameClipLength);
+ fkIndicesFutures.add(fkif);
+ depPool.add(fkif);
+ }
+ }
+ }
+ // for each newly added table, add declared indexes
+ List<DeclaredIndexFuture> disFutures = FastList.newInstance();
+ AtomicInteger totalDis = new AtomicInteger();
+ if (datasourceInfo.useIndices) {
+ for (ModelEntity curEntity: entitiesAdded) {
+ if (curEntity.getIndexesSize() > 0) {
+ DeclaredIndexFuture dif = new
DeclaredIndexFuture(totalDis, curEntity);
+ disFutures.add(dif);
+ depPool.add(dif);
+ }
+ }
+ }
+
+ depPool.start();
+ try {
+ depPool.await();
+ } catch (InterruptedException e) {
+ Debug.logError(e, "Couldn't wait for dependency pool to finish
while creating the database", module);
+ return;
}
- timer.timerString("After Individual Table/Column Check");
// -list all tables that do not have a corresponding entity
for (String tableName: tableNames) {
@@ -505,17 +577,13 @@ public class DatabaseUtil {
Debug.logWarning(message, module);
if (messages != null) messages.add(message);
}
+ for (TableFuture tableFuture: tableFutures) {
+ tableFuture.updateData(messages, entitiesAdded);
+ }
+ timer.timerString("After Individual Table/Column Check");
- // for each newly added table, add fk indices
if (datasourceInfo.useFkIndices) {
- List<Future<ForeignKeyIndexFuture>> fkIndicesFutureFutures =
FastList.newInstance();
- AtomicInteger totalFkIndices = new AtomicInteger();
- for (ModelEntity curEntity: entitiesAdded) {
- if (curEntity.getRelationsOneSize() > 0) {
- fkIndicesFutureFutures.add(threadPool.submit(new
ForeignKeyIndexFuture(totalFkIndices, curEntity,
datasourceInfo.constraintNameClipLength)));
- }
- }
- for (ForeignKeyIndexFuture fkIndicesFuture:
getAllFutures(fkIndicesFutureFutures)) {
+ for (ForeignKeyIndexFuture fkIndicesFuture: fkIndicesFutures) {
fkIndicesFuture.updateData(messages);
}
if (totalFkIndices.get() > 0) Debug.logImportant("==== TOTAL
Foreign Key Indices Created: " + totalFkIndices.get(), module);
@@ -532,14 +600,7 @@ public class DatabaseUtil {
// for each newly added table, add declared indexes
if (datasourceInfo.useIndices) {
- List<Future<DeclaredIndexFuture>> disFutureFutures =
FastList.newInstance();
- AtomicInteger totalDis = new AtomicInteger();
- for (ModelEntity curEntity: entitiesAdded) {
- if (curEntity.getIndexesSize() > 0) {
- disFutureFutures.add(threadPool.submit(new
DeclaredIndexFuture(totalDis, curEntity)));
- }
- }
- for (DeclaredIndexFuture disFuture:
getAllFutures(disFutureFutures)) {
+ for (DeclaredIndexFuture disFuture: disFutures) {
disFuture.updateData(messages);
}
if (totalDis.get() > 0) Debug.logImportant("==== TOTAL Declared
Indices Created: " + totalDis.get(), module);