Author: ppoddar
Date: Thu Dec 18 16:08:23 2008
New Revision: 727864

URL: http://svn.apache.org/viewvc?rev=727864&view=rev
Log:
OPENJPA-825: Introduced internal locking for shared contexts 
(BrokerImpl/QueryImpl).

Modified:
    
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
    
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
    
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java
    
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
    
openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties
    
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
    
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
    
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
    
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
    
openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties
    
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
    
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java

Modified: 
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
 (original)
+++ 
openjpa/trunk/openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/JDBCStoreQuery.java
 Thu Dec 18 16:08:23 2008
@@ -84,7 +84,6 @@
 public class JDBCStoreQuery 
     extends ExpressionStoreQuery {
 
-       private boolean _isUnique = false;
     private static final Table INVALID = new Table();
 
     // add all standard filter and aggregate listeners to these maps
@@ -112,11 +111,6 @@
         _store = store;
     }
 
-    @Override
-    public void setContext(QueryContext ctx) {
-       super.setContext(ctx);
-       _isUnique = ctx.isUnique();
-    }
     /**
      * Return the store.
      */
@@ -348,7 +342,7 @@
                 evaluate(ctx, null, null, exps[i], states[i]);
             if (optHint != null)
                sel.setExpectedResultCount(optHint.intValue(), true);
-            else if (_isUnique)
+            else if (this.ctx.isUnique())
                 sel.setExpectedResultCount(1, false);
             for (int j = 0; j < verts.length; j++) {
                 selMappings.add(verts[j]);
@@ -430,7 +424,7 @@
         long end) {
         if (exps.projections.length > 0 || start >= end)
             return EagerFetchModes.EAGER_NONE;
-        if (end - start == 1 || _isUnique)
+        if (end - start == 1 || ctx.isUnique())
             return EagerFetchModes.EAGER_JOIN;
         return EagerFetchModes.EAGER_PARALLEL;
     }

Modified: 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
 (original)
+++ 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/BrokerImpl.java
 Thu Dec 18 16:08:23 2008
@@ -37,6 +37,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.LockSupport;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.transaction.Status;
@@ -4161,7 +4162,7 @@
     ///////////////////
 
     public void lock() {
-        if (_lock != null)
+        if (_lock != null) 
             _lock.lock();
     }
 
@@ -4169,6 +4170,24 @@
         if (_lock != null)
             _lock.unlock();
     }
+    
+    /**
+     * Creates a locks irrespective of multithreaded support. Used only by 
+     * internal implementation to guard access when it spawns its own threads 
+     * and user configured the broker for single-threaded access. 
+     */
+    public synchronized void startLocking() {
+       if (_lock == null)
+               _lock = new ReentrantLock();
+    }
+    
+    /**
+     * Destroys the lock if not multithreaded support. 
+     */
+    public synchronized void stopLocking() {
+       if (_lock != null && !getMultithreaded())
+               _lock = null;
+    }
 
     ////////////////////
     // State management

Modified: 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java
 (original)
+++ 
openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/kernel/QueryImpl.java
 Thu Dec 18 16:08:23 2008
@@ -84,7 +84,7 @@
     private transient ClassLoader _loader = null;
 
     // query has its own internal lock
-    private final ReentrantLock _lock;
+    private ReentrantLock _lock;
 
     // unparsed state
     private Class _class = null;
@@ -138,8 +138,6 @@
 
         if (_broker != null && _broker.getMultithreaded())
             _lock = new ReentrantLock();
-        else
-            _lock = null;
     }
 
     /**
@@ -458,8 +456,8 @@
             // no explicit setting; default
             StoreQuery.Executor ex = compileForExecutor();
             if (!ex.isAggregate(_storeQuery))
-                return _unique = false;
-            return _unique = !ex.hasGrouping(_storeQuery);
+                return false;
+            return !ex.hasGrouping(_storeQuery);
         } finally {
             unlock();
         }
@@ -1553,9 +1551,22 @@
     }
 
     public void unlock() {
-        if (_lock != null && _lock.isLocked())
+        if (_lock != null)
             _lock.unlock();
     }
+    
+    public synchronized void startLocking() {
+       if (_lock == null) {
+               _lock = new ReentrantLock();
+       }
+    }
+    
+    public synchronized void stopLocking() {
+       if (_lock != null && !_broker.getMultithreaded())
+               _lock = null;
+    }
+    
+    
 
     /////////
     // Utils

Modified: 
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
 (original)
+++ 
openjpa/trunk/openjpa-persistence/src/main/java/org/apache/openjpa/persistence/QueryImpl.java
 Thu Dec 18 16:08:23 2008
@@ -37,6 +37,8 @@
 
 import javax.persistence.FlushModeType;
 import javax.persistence.LockModeType;
+import javax.persistence.NoResultException;
+import javax.persistence.NonUniqueResultException;
 import javax.persistence.Query;
 import javax.persistence.TemporalType;
 
@@ -444,15 +446,14 @@
         */
        public Object getSingleResult() {
                _em.assertNotCloseInvoked();
-               // temporarily set query to unique so that a single result is 
validated
-               // and returned; unset again in case the user executes query 
again
-               // via getResultList
-               _query.setUnique(true);
-               try {
-                       return execute();
-               } finally {
-                       _query.setUnique(false);
-               }
+               List result = getResultList();
+               if (result == null || result.isEmpty())
+                       throw new NoResultException(_loc.get("no-result", 
getQueryString())
+                               .getMessage());
+               if (result.size() > 1)
+                       throw new 
NonUniqueResultException(_loc.get("non-unique-result",
+                               getQueryString(), result.size()).getMessage());
+               return result.get(0);
        }
 
        public int executeUpdate() {

Modified: 
openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties
 (original)
+++ 
openjpa/trunk/openjpa-persistence/src/main/resources/org/apache/openjpa/persistence/localizer.properties
 Thu Dec 18 16:08:23 2008
@@ -163,4 +163,5 @@
        but this parameter is bound to a field of primitive type "{2}".
 version-check-error: An error occurred while attempting to determine the \
     version of "{0}".
-       
\ No newline at end of file
+no-result: Query "{0}" selected no result, but expected unique result.
+non-unique-result: Query "{0}" selected {1} results, but expected unique 
result.
\ No newline at end of file

Modified: 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
 Thu Dec 18 16:08:23 2008
@@ -91,4 +91,11 @@
            }
            return true;
        }
+       
+       /**
+        * A virtual datastore need not be opened.
+        */
+       @Override
+       public void beginStore() {
+       }
 }

Modified: 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ProductDerivation.java
 Thu Dec 18 16:08:23 2008
@@ -23,6 +23,10 @@
 import org.apache.openjpa.conf.OpenJPAProductDerivation;
 import org.apache.openjpa.lib.conf.AbstractProductDerivation;
 import org.apache.openjpa.lib.conf.Configuration;
+import org.apache.openjpa.lib.conf.PluginValue;
+import org.apache.openjpa.lib.conf.Value;
+import org.apache.openjpa.lib.log.Log;
+import org.apache.openjpa.lib.util.Localizer;
 import org.apache.openjpa.slice.jdbc.DistributedJDBCBrokerFactory;
 import org.apache.openjpa.slice.jdbc.DistributedJDBCConfigurationImpl;
 
@@ -37,10 +41,12 @@
  */
 public class ProductDerivation extends AbstractProductDerivation implements
                OpenJPAProductDerivation {
+       private static final Localizer _loc = 
+               Localizer.forPackage(ProductDerivation.class);
     /**
      * Prefix for all Slice-specific configuration properties. 
      */
-    public static final String PREFIX_SLICE = "openjpa.slice";
+    public static final String PREFIX_SLICE   = "openjpa.slice";
     
     /**
      * Hint key <code>openjpa.hint.slice.Target </code> to specify a subset of 
@@ -74,14 +80,22 @@
         DistributedJDBCConfigurationImpl conf = 
                (DistributedJDBCConfigurationImpl)c;
         boolean modified = false;
+        Log log = conf.getConfigurationLog();
         if (conf.getDistributionPolicyInstance() == null) {
-               conf.distributionPolicyPlugin.setString("random");
+               forceSet(PREFIX_SLICE, conf.distributionPolicyPlugin,"random", 
log);
                modified = true;
         }
         if (conf.getReplicationPolicyInstance() == null) {
-               conf.replicationPolicyPlugin.setString("all");
+               forceSet(PREFIX_SLICE, conf.replicationPolicyPlugin, "all", 
log);
                modified = true;
         }
         return modified;
     }
+    
+    void forceSet(String prefix, Value v, String forced, Log log) {
+       v.setString(forced);
+       if (log.isWarnEnabled())
+               log.warn(_loc.get("forced-set-config", 
+                       prefix+"."+v.getProperty(), forced));
+    }
 }

Modified: 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
 Thu Dec 18 16:08:23 2008
@@ -39,6 +39,7 @@
 import org.apache.openjpa.jdbc.kernel.JDBCStoreManager;
 import org.apache.openjpa.jdbc.sql.Result;
 import org.apache.openjpa.jdbc.sql.ResultSetResult;
+import org.apache.openjpa.kernel.BrokerImpl;
 import org.apache.openjpa.kernel.FetchConfiguration;
 import org.apache.openjpa.kernel.OpenJPAStateManager;
 import org.apache.openjpa.kernel.PCState;
@@ -242,7 +243,7 @@
         List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
         Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
         
-        boolean serialMode = getConfiguration().getMultithreaded();
+        boolean parallel = !getConfiguration().getMultithreaded();
         for (SliceStoreManager slice : _slices) {
             List<OpenJPAStateManager> subset = subsets.get(slice.getName());
             if (subset.isEmpty())
@@ -250,14 +251,14 @@
             if (containsReplicated(subset)) {
                collectException(slice.flush(subset), exceptions);
             } else {
-               if (serialMode) {
-                       collectException(slice.flush(subset), exceptions);
-               } else {
+               if (parallel) {
                        futures.add(threadPool.submit(new Flusher(slice, 
subset)));
+               } else {
+                       collectException(slice.flush(subset), exceptions);
                }
             }
         }
-        if (!serialMode) {
+        if (parallel) {
                for (Future<Collection> future : futures) {
                    try {
                        collectException(future.get(), exceptions);
@@ -459,7 +460,12 @@
         }
 
         public Collection call() throws Exception {
-            return store.flush(toFlush);
+               ((BrokerImpl)store.getContext()).startLocking();
+               try {
+                       return store.flush(toFlush);
+               } finally {
+               ((BrokerImpl)store.getContext()).stopLocking();
+               }
         }
     }
 

Modified: 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
 Thu Dec 18 16:08:23 2008
@@ -28,10 +28,12 @@
 
 import org.apache.openjpa.jdbc.kernel.JDBCStore;
 import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.kernel.BrokerImpl;
 import org.apache.openjpa.kernel.ExpressionStoreQuery;
 import org.apache.openjpa.kernel.FetchConfiguration;
 import org.apache.openjpa.kernel.OrderingMergedResultObjectProvider;
 import org.apache.openjpa.kernel.QueryContext;
+import org.apache.openjpa.kernel.QueryImpl;
 import org.apache.openjpa.kernel.StoreManager;
 import org.apache.openjpa.kernel.StoreQuery;
 import org.apache.openjpa.kernel.exps.ExpressionParser;
@@ -44,272 +46,292 @@
 /**
  * A query for distributed databases.
  * 
- * @author Pinaki Poddar 
- *
+ * @author Pinaki Poddar
+ * 
  */
 @SuppressWarnings("serial")
 class DistributedStoreQuery extends JDBCStoreQuery {
        private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
        private ExpressionParser _parser;
-       private boolean _serialMode;
-       
+
        public DistributedStoreQuery(JDBCStore store, ExpressionParser parser) {
                super(store, parser);
                _parser = parser;
-               _serialMode = 
store.getContext().getConfiguration().getMultithreaded();
-               
        }
-       
+
        void add(StoreQuery q) {
                _queries.add(q);
        }
-       
+
        public DistributedStoreManager getDistributedStore() {
-               return (DistributedStoreManager)getStore();
+               return (DistributedStoreManager) getStore();
+       }
+
+       public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
+               boolean parallel = !getContext().getStoreContext().getBroker()
+                       .getMultithreaded();
+               ParallelExecutor ex = new ParallelExecutor(this, meta, subs, 
_parser, 
+                       ctx.getCompilation(), parallel);
+               for (StoreQuery q : _queries) {
+                       ex.addExecutor(q.newDataStoreExecutor(meta, subs));
+               }
+               return ex;
+       }
+
+       public void setContext(QueryContext ctx) {
+               super.setContext(ctx);
+               for (StoreQuery q : _queries)
+                       q.setContext(ctx);
+       }
+
+       public ExecutorService getExecutorServiceInstance() {
+               DistributedJDBCConfiguration conf = 
((DistributedJDBCConfiguration) 
+                       getStore().getConfiguration());
+               return conf.getExecutorServiceInstance();
        }
-       
-    public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
-       ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser, 
-                       ctx.getCompilation(), _serialMode);
-        for (StoreQuery q : _queries) {
-            ex.addExecutor(q.newDataStoreExecutor(meta, subs));
-        }
-        return ex;
-    }
-    
-    public void setContext(QueryContext ctx) {
-       super.setContext(ctx);
-       for (StoreQuery q : _queries) 
-               q.setContext(ctx); 
-    }
-    
-    public ExecutorService getExecutorServiceInstance() {
-        DistributedJDBCConfiguration conf = 
-            ((DistributedJDBCConfiguration)getStore().getConfiguration());
-        return conf.getExecutorServiceInstance();
-    }
-    
+
        /**
         * Executes queries on multiple databases.
         * 
-        * @author Pinaki Poddar 
-        *
+        * @author Pinaki Poddar
+        * 
         */
-       public static class ParallelExecutor extends 
-               ExpressionStoreQuery.DataStoreExecutor {
+       public static class ParallelExecutor extends
+                       ExpressionStoreQuery.DataStoreExecutor {
                private List<Executor> executors = new ArrayList<Executor>();
                private DistributedStoreQuery owner = null;
                private ExecutorService threadPool = null;
-               private final boolean serialMode;
-               
-        public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta, 
-               boolean subclasses, ExpressionParser parser, Object parsed, 
-               boolean serial) {
-               super(dsq, meta, subclasses, parser, parsed);
-               owner = dsq;
-               threadPool = dsq.getExecutorServiceInstance();
-               serialMode = false;//serial;
-        }
-        
+               private final boolean parallel;
+
+               public ParallelExecutor(DistributedStoreQuery dsq, 
ClassMetaData meta,
+                               boolean subclasses, ExpressionParser parser, 
Object parsed, 
+                               boolean parallel) {
+                       super(dsq, meta, subclasses, parser, parsed);
+                       owner = dsq;
+                       threadPool = dsq.getExecutorServiceInstance();
+                       this.parallel = parallel;
+               }
+
                public void addExecutor(Executor ex) {
                        executors.add(ex);
                }
-               
-        /**
-         * Each child query must be executed with slice context and not the 
-         * given query context.
-         */
-        public ResultObjectProvider executeQuery(StoreQuery q,
-                final Object[] params, final Range range) {
-               List<Future<ResultObjectProvider>> futures = null;
-               final List<Executor> usedExecutors = new ArrayList<Executor>();
-               final List<ResultObjectProvider> rops = 
-                       new ArrayList<ResultObjectProvider>();
-               List<SliceStoreManager> targets = findTargets();
-               QueryContext ctx = q.getContext();
-               boolean isReplicated = containsReplicated(ctx);
-               for (int i = 0; i < owner._queries.size(); i++) {
-                       // if replicated, then execute only on single slice
-                       if (i > 0 && isReplicated) {
-                               continue;
-                       }
-                       StoreManager sm  = 
owner.getDistributedStore().getSlice(i);
-                       if (!targets.contains(sm))
-                               continue;
-                       StoreQuery query = owner._queries.get(i);
-                       Executor executor = executors.get(i);
-                       if (!targets.contains(sm))
-                               continue;
-                       usedExecutors.add(executor);
-                       if (serialMode) {
-                               rops.add(executor.executeQuery(query, params, 
range));
-                       } else {
-                               if (futures == null)
-                                       futures = new 
ArrayList<Future<ResultObjectProvider>>();
-                               QueryExecutor call = new QueryExecutor();
-                               call.executor = executor;
-                               call.query    = query;
-                               call.params   = params;
-                               call.range    = range;
-                               futures.add(threadPool.submit(call)); 
-                       }
-               }
-               if (!serialMode) {
-                       for (Future<ResultObjectProvider> future:futures) {
-                               try {
+
+               /**
+                * Each child query must be executed with slice context and not 
the
+                * given query context.
+                */
+               public ResultObjectProvider executeQuery(StoreQuery q,
+                               final Object[] params, final Range range) {
+                       List<Future<ResultObjectProvider>> futures = 
+                               new ArrayList<Future<ResultObjectProvider>>();
+                       final List<Executor> usedExecutors = new 
ArrayList<Executor>();
+                       final List<ResultObjectProvider> rops = 
+                               new ArrayList<ResultObjectProvider>();
+                       List<SliceStoreManager> targets = findTargets();
+                       QueryContext ctx = q.getContext();
+                       boolean isReplicated = containsReplicated(ctx);
+                       for (int i = 0; i < owner._queries.size(); i++) {
+                               // if replicated, then execute only on single 
slice
+                               if (i > 0 && isReplicated) {
+                                       continue;
+                               }
+                               StoreManager sm = 
owner.getDistributedStore().getSlice(i);
+                               if (!targets.contains(sm))
+                                       continue;
+                               StoreQuery query = owner._queries.get(i);
+                               Executor executor = executors.get(i);
+                               if (!targets.contains(sm))
+                                       continue;
+                               usedExecutors.add(executor);
+                               if (!parallel) {
+                                       rops.add(executor.executeQuery(query, 
params, range));
+                               } else {
+                                       QueryExecutor call = new 
QueryExecutor();
+                                       call.executor = executor;
+                                       call.query = query;
+                                       call.params = params;
+                                       call.range = range;
+                                       futures.add(threadPool.submit(call));
+                               }
+
+                       }
+                       if (parallel) {
+                               for (Future<ResultObjectProvider> future : 
futures) {
+                                       try {
                                                rops.add(future.get());
                                        } catch (InterruptedException e) {
                                                throw new RuntimeException(e);
                                        } catch (ExecutionException e) {
                                                throw new 
StoreException(e.getCause());
                                        }
-                       }
-               }
-               ResultObjectProvider[] tmp = rops.toArray
-                       (new ResultObjectProvider[rops.size()]);
-               ResultObjectProvider result = null;
-               boolean[] ascending = getAscending(q);
-               boolean isAscending = ascending.length > 0;
-               boolean isAggregate = ctx.isAggregate();
-               boolean hasRange    = ctx.getEndRange() != Long.MAX_VALUE;
-               if (isAggregate) {
-                   result = new UniqueResultObjectProvider(tmp, q, 
-                           getQueryExpressions());
-               } else if (isAscending) {
-                   result = new OrderingMergedResultObjectProvider(tmp, 
ascending, 
-                  usedExecutors.toArray(new Executor[usedExecutors.size()]),
-                  q, params);
-               } else {
-                   result = new MergedResultObjectProvider(tmp);
-               }
-               if (hasRange) {
-                   result = new RangeResultObjectProvider(result, 
-                           ctx.getStartRange(), ctx.getEndRange());
-               }
-               return result;
-        }
-        
-        /**
+                               }
+                       }
+                       ResultObjectProvider[] tmp = rops
+                                       .toArray(new 
ResultObjectProvider[rops.size()]);
+                       ResultObjectProvider result = null;
+                       boolean[] ascending = getAscending(q);
+                       boolean isAscending = ascending.length > 0;
+                       boolean isAggregate = ctx.isAggregate();
+                       boolean hasRange = ctx.getEndRange() != Long.MAX_VALUE;
+                       if (isAggregate) {
+                               result = new UniqueResultObjectProvider(tmp, q,
+                                               getQueryExpressions());
+                       } else if (isAscending) {
+                               result = new 
OrderingMergedResultObjectProvider(tmp, ascending,
+                                       usedExecutors.toArray(new 
Executor[usedExecutors.size()]),
+                                       q, params);
+                       } else {
+                               result = new MergedResultObjectProvider(tmp);
+                       }
+                       if (hasRange) {
+                               result = new RangeResultObjectProvider(result, 
ctx
+                                               .getStartRange(), 
ctx.getEndRange());
+                       }
+                       return result;
+               }
+
+               /**
                 * Scans metadata to find out if a replicated class is the 
candidate.
-        **/
-        boolean containsReplicated(QueryContext query) {
-               Class candidate = query.getCandidateType();
-               if (candidate != null) {
-                       ClassMetaData meta = 
query.getStoreContext().getConfiguration()
-                               .getMetaDataRepositoryInstance()
-                               .getMetaData(candidate, null, true);
-                       if (meta != null && meta.isReplicated())
-                               return true;
-               }
-               ClassMetaData[] metas = query.getAccessPathMetaDatas();
-               if (metas == null || metas.length < 1)
-                       return false;
-               for (ClassMetaData type : metas)
-                       if (type.isReplicated())
-                               return true;
-               return false;
-        }
-        
-        public Number executeDelete(StoreQuery q, Object[] params) {
-               Iterator<StoreQuery> qs = owner._queries.iterator();
-               List<Future<Number>> futures = null;
-               int result = 0;
-               for (Executor ex:executors) {
-                       if (serialMode) {
-                               Number n = ex.executeDelete(qs.next(), params); 
   
-                               if (n != null)
-                                       result += n.intValue();
-                       } else {
-                               if (futures == null)
-                                       futures = new 
ArrayList<Future<Number>>();
-                               DeleteExecutor call = new DeleteExecutor();
-                               call.executor = ex;
-                               call.query    = qs.next();
-                               call.params   = params;
-                               futures.add(threadPool.submit(call)); 
-                       }
-               }
-               if (!serialMode) {
-                       for (Future<Number> future:futures) {
-                               try {
-                               Number n = future.get();
-                               if (n != null) 
-                                       result += n.intValue();
-                                       } catch (InterruptedException e) {
-                                               throw new RuntimeException(e);
-                                       } catch (ExecutionException e) {
-                                               throw new 
StoreException(e.getCause());
-                                       }
-                       }
-               }
-               return result;
-        }
-        
-        public Number executeUpdate(StoreQuery q, Object[] params) {
-               Iterator<StoreQuery> qs = owner._queries.iterator();
-               List<Future<Number>> futures = null;
-               int result = 0;
-               for (Executor ex:executors) {
-                       if (serialMode) {
-                               Number n = ex.executeUpdate(qs.next(), params);
-                               result += (n == null) ? 0 : n.intValue();
-                       } else {
-                               if (futures == null)
-                                       futures = new 
ArrayList<Future<Number>>();
-                       UpdateExecutor call = new UpdateExecutor();
-                       call.executor = ex;
-                       call.query    = qs.next();
-                       call.params   = params;
-                       futures.add(threadPool.submit(call)); 
-                       }
-               }
-               if (serialMode) {
-                       for (Future<Number> future:futures) {
-                               try {
-                               Number n = future.get();
-                                       result += (n == null) ? 0 : 
n.intValue();
-                                       } catch (InterruptedException e) {
-                                               throw new RuntimeException(e);
-                                       } catch (ExecutionException e) {
-                                               throw new 
StoreException(e.getCause());
-                                       }
-                       }
-               }
-               return result;
-        }
-        
-        List<SliceStoreManager> findTargets() {
-               FetchConfiguration fetch = 
owner.getContext().getFetchConfiguration();
-               return owner.getDistributedStore().getTargets(fetch);
-        }
+                */
+               boolean containsReplicated(QueryContext query) {
+                       Class candidate = query.getCandidateType();
+                       if (candidate != null) {
+                               ClassMetaData meta = 
query.getStoreContext().getConfiguration()
+                                               
.getMetaDataRepositoryInstance().getMetaData(candidate,
+                                                               null, true);
+                               if (meta != null && meta.isReplicated())
+                                       return true;
+                       }
+                       ClassMetaData[] metas = query.getAccessPathMetaDatas();
+                       if (metas == null || metas.length < 1)
+                               return false;
+                       for (ClassMetaData type : metas)
+                               if (type.isReplicated())
+                                       return true;
+                       return false;
+               }
+
+               public Number executeDelete(StoreQuery q, Object[] params) {
+                       Iterator<StoreQuery> qs = owner._queries.iterator();
+                       List<Future<Number>> futures = null;
+                       int result = 0;
+                       for (Executor ex : executors) {
+                               if (futures == null)
+                                       futures = new 
ArrayList<Future<Number>>();
+                               DeleteExecutor call = new DeleteExecutor();
+                               call.executor = ex;
+                               call.query = qs.next();
+                               call.params = params;
+                               futures.add(threadPool.submit(call));
+                       }
+                       for (Future<Number> future : futures) {
+                               try {
+                                       Number n = future.get();
+                                       if (n != null)
+                                               result += n.intValue();
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException(e);
+                               } catch (ExecutionException e) {
+                                       throw new StoreException(e.getCause());
+                               }
+                       }
+                       return result;
+               }
+
+               public Number executeUpdate(StoreQuery q, Object[] params) {
+                       Iterator<StoreQuery> qs = owner._queries.iterator();
+                       List<Future<Number>> futures = null;
+                       int result = 0;
+                       for (Executor ex : executors) {
+                               if (futures == null)
+                                       futures = new 
ArrayList<Future<Number>>();
+                               UpdateExecutor call = new UpdateExecutor();
+                               call.executor = ex;
+                               call.query = qs.next();
+                               call.params = params;
+                               futures.add(threadPool.submit(call));
+                       }
+                       for (Future<Number> future : futures) {
+                               try {
+                                       Number n = future.get();
+                                       result += (n == null) ? 0 : 
n.intValue();
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException(e);
+                               } catch (ExecutionException e) {
+                                       throw new StoreException(e.getCause());
+                               }
+                       }
+                       return result;
+               }
+
+               List<SliceStoreManager> findTargets() {
+                       FetchConfiguration fetch = owner.getContext()
+                                       .getFetchConfiguration();
+                       return owner.getDistributedStore().getTargets(fetch);
+               }
        }
-       
-       static  class QueryExecutor implements Callable<ResultObjectProvider> {
+
+       static class QueryExecutor implements Callable<ResultObjectProvider> {
                StoreQuery query;
                Executor executor;
                Object[] params;
                Range range;
+
                public ResultObjectProvider call() throws Exception {
-                       return executor.executeQuery(query, params, range);
+                       ((QueryImpl)query.getContext()).startLocking();
+                       
((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+                       ((QueryImpl)query.getContext()).lock();
+                       
((BrokerImpl)query.getContext().getStoreContext()).lock();
+                       try { 
+                               return executor.executeQuery(query, params, 
range);
+                       } finally {
+                               ((QueryImpl)query.getContext()).unlock();
+                               
((BrokerImpl)query.getContext().getStoreContext()).unlock();
+                               ((QueryImpl)query.getContext()).stopLocking();
+                               
((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+                       }
                }
        }
-       
-       static  class DeleteExecutor implements Callable<Number> {
+
+       static class DeleteExecutor implements Callable<Number> {
                StoreQuery query;
                Executor executor;
                Object[] params;
+
                public Number call() throws Exception {
-                       return executor.executeDelete(query, params);
+                       ((QueryImpl)query.getContext()).startLocking();
+                       
((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+                       ((QueryImpl)query.getContext()).lock();
+                       
((BrokerImpl)query.getContext().getStoreContext()).lock();
+                       try { 
+                               return executor.executeDelete(query, params);
+                       } finally {
+                               ((QueryImpl)query.getContext()).unlock();
+                               
((BrokerImpl)query.getContext().getStoreContext()).unlock();
+                               ((QueryImpl)query.getContext()).stopLocking();
+                               
((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+                       }
                }
        }
-       
-       static  class UpdateExecutor implements Callable<Number> {
+
+       static class UpdateExecutor implements Callable<Number> {
                StoreQuery query;
                Executor executor;
                Object[] params;
+
                public Number call() throws Exception {
-                       return executor.executeUpdate(query, params);
+                       ((QueryImpl)query.getContext()).startLocking();
+                       
((BrokerImpl)query.getContext().getStoreContext()).startLocking();
+                       ((QueryImpl)query.getContext()).lock();
+                       
((BrokerImpl)query.getContext().getStoreContext()).lock();
+                       try { 
+                               return executor.executeUpdate(query, params);
+                       } finally {
+                               ((QueryImpl)query.getContext()).unlock();
+                               
((BrokerImpl)query.getContext().getStoreContext()).unlock();
+                               ((QueryImpl)query.getContext()).stopLocking();
+                               
((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
+                       }
                }
        }
 }
-

Modified: 
openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/main/resources/org/apache/openjpa/slice/localizer.properties
 Thu Dec 18 16:08:23 2008
@@ -17,4 +17,10 @@
 bad-policy-slice:Distribution policy "{0}" has returned invalid slice \
        "{1}" for "{2}". The valid slices are {3}. This error may happen \
        when one or more of the originally configured slices are unavailable \
-       and Lenient property is set to true.
\ No newline at end of file
+       and Lenient property is set to true.
+forced-set-config: Configuration property "{0}" is not set explicitly. Setting 
\
+       this value to "{1}".
+multithreaded-false: Configuration property "{0}" is set to "false". \
+       It is recommended to set "{0}" to "true", because Slice executes 
database \
+       operations per slice in parallel in different threads, setting "{0}" to 
\
+       "false" may cause unpredictable behavior. 
\ No newline at end of file

Modified: 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
 Thu Dec 18 16:08:23 2008
@@ -218,7 +218,7 @@
      * Disable this test temporarily as we undergo changes in internal slice 
      * information structure.
      */
-    public void xtestUpdateReplicatedObjects() {
+    public void testUpdateReplicatedObjects() {
         EntityManager em = emf.createEntityManager();
         em.getTransaction().begin();
         String[] names = {"USA", "India", "China"};

Modified: 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
URL: 
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java?rev=727864&r1=727863&r2=727864&view=diff
==============================================================================
--- 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
 (original)
+++ 
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestQueryMultiThreaded.java
 Thu Dec 18 16:08:23 2008
@@ -18,6 +18,9 @@
  */
 package org.apache.openjpa.slice;
 
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -25,6 +28,9 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import javax.persistence.EntityManager;
@@ -42,7 +48,7 @@
        private int POBJECT_COUNT = 25;
        private int VALUE_MIN = 100;
        private int VALUE_MAX = VALUE_MIN + POBJECT_COUNT - 1;
-       private static int THREADS = 3;
+       private static int THREADS = 5;
        private ExecutorService group; 
        private Future[] futures;
 
@@ -57,7 +63,14 @@
                if (count == 0) {
                        create(POBJECT_COUNT);
                }
-               group = Executors.newCachedThreadPool();
+               group = new ThreadPoolExecutor(THREADS, THREADS,
+                60, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), new ThreadFactory() {
+                                       public Thread newThread(Runnable r) {
+                                               return new Thread(r);
+                                       }
+                               
+                               });
                futures = new Future[THREADS];
        }
        
@@ -293,8 +306,9 @@
                                        f.get();
                                } catch (ExecutionException e) {
                                        Throwable t = e.getCause();
-                                       t.getCause().printStackTrace();
-                                       fail("Failed " + t.getCause());
+                                       StringWriter writer = new 
StringWriter();
+                                       t.printStackTrace(new 
PrintWriter(writer));
+                                       fail("Failed " + writer.toString());
                                }
                } catch (InterruptedException e) {
 


Reply via email to