Author: ppoddar
Date: Thu Feb 26 20:20:44 2009
New Revision: 748292
URL: http://svn.apache.org/viewvc?rev=748292&view=rev
Log:
OPENJPA-825: A new threading policy for Slice via extension
Added:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
(original)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedBrokerImpl.java
Thu Feb 26 20:20:44 2009
@@ -21,6 +21,8 @@
import org.apache.openjpa.kernel.FinalizingBrokerImpl;
import org.apache.openjpa.kernel.OpCallbacks;
import org.apache.openjpa.kernel.OpenJPAStateManager;
+import org.apache.openjpa.kernel.QueryImpl;
+import org.apache.openjpa.kernel.StoreQuery;
import org.apache.openjpa.lib.util.Localizer;
/**
@@ -37,9 +39,16 @@
public class DistributedBrokerImpl extends FinalizingBrokerImpl {
private transient String _rootSlice;
private transient DistributedConfiguration _conf;
+ private final ReentrantSliceLock _lock;
+
private static final Localizer _loc =
Localizer.forPackage(DistributedBrokerImpl.class);
+ public DistributedBrokerImpl() {
+ super();
+ _lock = new ReentrantSliceLock();
+ }
+
public DistributedConfiguration getConfiguration() {
if (_conf == null) {
_conf = (DistributedConfiguration)super.getConfiguration();
@@ -89,6 +98,26 @@
return true;
}
+ /**
+ * Create a new query.
+ */
+ protected QueryImpl newQueryImpl(String lang, StoreQuery sq) {
+ return new DistributedQueryImpl(this, lang, sq);
+ }
+
+ /**
+ * Always uses lock irrespective of super's multi-threaded settings.
+ */
+ @Override
+ public void lock() {
+ _lock.lock();
+ }
+
+ @Override
+ public void unlock() {
+ _lock.unlock();
+ }
+
/**
* A virtual datastore need not be opened.
*/
Added:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java?rev=748292&view=auto
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java
(added)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/DistributedQueryImpl.java
Thu Feb 26 20:20:44 2009
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.slice;
+
+import org.apache.openjpa.kernel.Broker;
+import org.apache.openjpa.kernel.QueryImpl;
+import org.apache.openjpa.kernel.StoreQuery;
+
+/**
+ * Extension with slice locking policy.
+ *
+ * @author Pinaki Poddar
+ *
+ */
+public class DistributedQueryImpl extends QueryImpl {
+ private final ReentrantSliceLock _lock;
+ public DistributedQueryImpl(Broker broker, String language, StoreQuery
storeQuery) {
+ super(broker, language, storeQuery);
+ _lock = new ReentrantSliceLock();
+ }
+
+ /**
+ * Always uses lock irrespective of super's multi-threaded settings.
+ */
+ @Override
+ public void lock() {
+ _lock.lock();
+ }
+
+ @Override
+ public void unlock() {
+ _lock.unlock();
+ }
+}
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java
(original)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ExecutorServiceValue.java
Thu Feb 26 20:20:44 2009
@@ -62,6 +62,8 @@
/**
* Configures a cached or fixed thread pool.
+ * The factory always produces SliceThread which uses special locking.
+ *
*/
@Override
public Object instantiate(Class type, Configuration conf, boolean fatal) {
@@ -73,21 +75,7 @@
Options opts = Configurations.parseProperties(getProperties());
- ThreadFactory factory = null;
- if (opts.containsKey("ThreadFactory")) {
- String fName = opts.getProperty("ThreadFactory");
- try {
- factory = (ThreadFactory) Class.forName(fName).newInstance();
- Configurations.configureInstance(factory, conf, opts,
- getProperty());
- } catch (Throwable t) {
- throw new UserException(_loc.get("bad-thread-factory", fName),
t);
- } finally {
- opts.removeProperty("ThreadFactory");
- }
- } else {
- factory = Executors.defaultThreadFactory();
- }
+ ThreadFactory factory = new SliceThreadFactory();
if ("cached".equals(cls)) {
obj = Executors.newCachedThreadPool(factory);
} else if ("fixed".equals(cls)) {
@@ -105,4 +93,10 @@
set(obj, true);
return obj;
}
+
+ private static class SliceThreadFactory implements ThreadFactory {
+ public Thread newThread(Runnable r) {
+ return new SliceThread(Thread.currentThread(), r);
+ }
+ }
}
Added:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java?rev=748292&view=auto
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java
(added)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/ReentrantSliceLock.java
Thu Feb 26 20:20:44 2009
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.slice;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A reentrant lock that lets a child to work with the parent's lock.
+ *
+ * @author Pinaki Poddar
+ *
+ */
+public class ReentrantSliceLock extends ReentrantLock {
+
+ public ReentrantSliceLock() {
+ }
+
+ public ReentrantSliceLock(boolean fair) {
+ super(fair);
+ }
+
+ /**
+ * Locks only for parent thread and let the child use parent's lock.
+ */
+ @Override
+ public void lock() {
+ if (Thread.currentThread() instanceof SliceThread)
+ return;
+ super.lock();
+ }
+
+ /**
+ * Unlocks only if parent thread.
+ */
+ @Override
+ public void unlock() {
+ if (Thread.currentThread() instanceof SliceThread)
+ return;
+ super.unlock();
+ }
+}
Added:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java?rev=748292&view=auto
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
(added)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SliceThread.java
Thu Feb 26 20:20:44 2009
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.openjpa.slice;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A thread to execute operation against each database slice.
+ *
+ * @author Pinaki Poddar
+ *
+ */
+public class SliceThread extends Thread {
+ private final Thread _parent;
+
+ public SliceThread(String name, Thread parent, Runnable r) {
+ super(r, name);
+ _parent = parent;
+ }
+
+ public SliceThread(Thread parent, Runnable r) {
+ super(r);
+ _parent = parent;
+ }
+
+ /**
+ * Gets the parent thread of this receiver.
+ *
+ */
+ public Thread getParent() {
+ return _parent;
+ }
+
+ /**
+ * Create a pool of given size.
+ * The thread factory is specialized to create SliceThread which gets
+ * preferential treatment for locking.
+ *
+ */
+
+ public static ExecutorService newPool(int size) {
+ return new ThreadPoolExecutor(size, size, 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(), new SliceThreadFactory());
+ }
+
+ static class SliceThreadFactory implements ThreadFactory {
+ int n = 0;
+ public Thread newThread(Runnable r) {
+ Thread parent = Thread.currentThread();
+ return new SliceThread(parent.getName()+"-slice-"+n++, parent, r);
+ }
+ }
+
+}
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
(original)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfiguration.java
Thu Feb 26 20:20:44 2009
@@ -18,34 +18,22 @@
*/
package org.apache.openjpa.slice.jdbc;
-import java.util.concurrent.ExecutorService;
-
import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
import org.apache.openjpa.slice.DistributedConfiguration;
import org.apache.openjpa.slice.Slice;
/**
- * A distributed configuration that is a ordered collection of
+ * A distributed configuration that is a ordered collection of
* JDBCConfigurations.
*
- * @author Pinaki Poddar
- *
+ * @author Pinaki Poddar
+ *
*/
-public interface DistributedJDBCConfiguration extends JDBCConfiguration,
- DistributedConfiguration {
+public interface DistributedJDBCConfiguration extends JDBCConfiguration,
+ DistributedConfiguration {
/**
* Gets the master slice.
*/
Slice getMaster();
-
- /**
- * Gets the alias for ExecutorService being used.
- */
-
- String getExecutorService();
-
- /**
- * Gets the ExecutorService being used.
- */
- ExecutorService getExecutorServiceInstance();
+
}
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
(original)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
Thu Feb 26 20:20:44 2009
@@ -477,19 +477,4 @@
_master = activeSlices.get(0);
}
}
-
- public String getExecutorService() {
- return executorServicePlugin.getString();
- }
-
- public void setExecutorService(ExecutorService txnManager) {
- executorServicePlugin.set(txnManager);
- }
-
- public ExecutorService getExecutorServiceInstance() {
- if (executorServicePlugin.get() == null) {
- executorServicePlugin.instantiate(ExecutorService.class, this);
- }
- return (ExecutorService) executorServicePlugin.get();
- }
}
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
(original)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
Thu Feb 26 20:20:44 2009
@@ -59,6 +59,7 @@
import org.apache.openjpa.slice.ProductDerivation;
import org.apache.openjpa.slice.SliceImplHelper;
import org.apache.openjpa.slice.SliceInfo;
+import org.apache.openjpa.slice.SliceThread;
import org.apache.openjpa.util.InternalException;
import org.apache.openjpa.util.StoreException;
@@ -78,7 +79,6 @@
private final DistributedJDBCConfiguration _conf;
private static final Localizer _loc =
Localizer.forPackage(DistributedStoreManager.class);
- private static ExecutorService threadPool =
Executors.newCachedThreadPool();
/**
* Constructs a set of child StoreManagers each connected to a physical
@@ -249,7 +249,7 @@
Map<String, StateManagerSet> subsets = bin(sms, null);
Collection<StateManagerSet> remaining =
new ArrayList<StateManagerSet>(subsets.values());
- boolean parallel = !getConfiguration().getMultithreaded();
+ ExecutorService threadPool = SliceThread.newPool(_slices.size());
for (int i = 0; i < _slices.size(); i++) {
SliceStoreManager slice = _slices.get(i);
StateManagerSet subset = subsets.get(slice.getName());
@@ -262,24 +262,19 @@
remaining.remove(subset);
rollbackVersion(subset.getReplicated(), oldVersions, remaining);
} else {
- if (parallel) {
- futures.add(threadPool.submit(new Flusher(slice,
subset)));
- } else {
- collectException(slice.flush(subset), exceptions);
- }
+ futures.add(threadPool.submit(new Flusher(slice, subset)));
}
}
- if (parallel) {
- for (Future<Collection> future : futures) {
- try {
- collectException(future.get(), exceptions);
- } catch (InterruptedException e) {
- throw new StoreException(e);
- } catch (ExecutionException e) {
- throw new StoreException(e.getCause());
- }
- }
+ for (Future<Collection> future : futures) {
+ try {
+ collectException(future.get(), exceptions);
+ } catch (InterruptedException e) {
+ throw new StoreException(e);
+ } catch (ExecutionException e) {
+ throw new StoreException(e.getCause());
+ }
}
+
return exceptions;
}
@@ -498,12 +493,7 @@
}
public Collection call() throws Exception {
- ((BrokerImpl)store.getContext()).startLocking();
- try {
- return store.flush(toFlush);
- } finally {
- ((BrokerImpl)store.getContext()).stopLocking();
- }
+ return store.flush(toFlush);
}
}
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
(original)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
Thu Feb 26 20:20:44 2009
@@ -25,6 +25,10 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.openjpa.jdbc.kernel.JDBCStore;
import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
@@ -41,6 +45,7 @@
import org.apache.openjpa.lib.rop.RangeResultObjectProvider;
import org.apache.openjpa.lib.rop.ResultObjectProvider;
import org.apache.openjpa.meta.ClassMetaData;
+import org.apache.openjpa.slice.SliceThread;
import org.apache.openjpa.util.StoreException;
/**
@@ -84,12 +89,6 @@
q.setContext(ctx);
}
- public ExecutorService getExecutorServiceInstance() {
- DistributedJDBCConfiguration conf =
((DistributedJDBCConfiguration)
- getStore().getConfiguration());
- return conf.getExecutorServiceInstance();
- }
-
/**
* Executes queries on multiple databases.
*
@@ -100,16 +99,12 @@
ExpressionStoreQuery.DataStoreExecutor {
private List<Executor> executors = new ArrayList<Executor>();
private DistributedStoreQuery owner = null;
- private ExecutorService threadPool = null;
- private final boolean parallel;
public ParallelExecutor(DistributedStoreQuery dsq,
ClassMetaData meta,
boolean subclasses, ExpressionParser parser,
Object parsed,
boolean parallel) {
super(dsq, meta, subclasses, parser, parsed);
owner = dsq;
- threadPool = dsq.getExecutorServiceInstance();
- this.parallel = parallel;
}
public void addExecutor(Executor ex) {
@@ -130,6 +125,7 @@
List<SliceStoreManager> targets = findTargets();
QueryContext ctx = q.getContext();
boolean isReplicated = containsReplicated(ctx);
+ ExecutorService threadPool =
SliceThread.newPool(owner._queries.size());
for (int i = 0; i < owner._queries.size(); i++) {
// if replicated, then execute only on single
slice
if (isReplicated && !usedExecutors.isEmpty()) {
@@ -143,28 +139,23 @@
if (!targets.contains(sm))
continue;
usedExecutors.add(executor);
- if (!parallel) {
- rops.add(executor.executeQuery(query,
params, range));
- } else {
QueryExecutor call = new
QueryExecutor();
call.executor = executor;
call.query = query;
call.params = params;
call.range = range;
futures.add(threadPool.submit(call));
- }
}
- if (parallel) {
- for (Future<ResultObjectProvider> future :
futures) {
- try {
- rops.add(future.get());
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new
StoreException(e.getCause());
- }
+ for (Future<ResultObjectProvider> future : futures) {
+ try {
+ rops.add(future.get());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new StoreException(e.getCause());
}
}
+
ResultObjectProvider[] tmp = rops
.toArray(new
ResultObjectProvider[rops.size()]);
ResultObjectProvider result = null;
@@ -214,6 +205,7 @@
Iterator<StoreQuery> qs = owner._queries.iterator();
List<Future<Number>> futures = null;
int result = 0;
+ ExecutorService threadPool =
SliceThread.newPool(executors.size());
for (Executor ex : executors) {
if (futures == null)
futures = new
ArrayList<Future<Number>>();
@@ -241,6 +233,7 @@
Iterator<StoreQuery> qs = owner._queries.iterator();
List<Future<Number>> futures = null;
int result = 0;
+ ExecutorService threadPool = SliceThread.newPool(executors.size());
for (Executor ex : executors) {
if (futures == null)
futures = new
ArrayList<Future<Number>>();
@@ -268,6 +261,7 @@
.getFetchConfiguration();
return owner.getDistributedStore().getTargets(fetch);
}
+
}
static class QueryExecutor implements Callable<ResultObjectProvider> {
@@ -277,18 +271,7 @@
Range range;
public ResultObjectProvider call() throws Exception {
- ((QueryImpl)query.getContext()).startLocking();
-
((BrokerImpl)query.getContext().getStoreContext()).startLocking();
- ((QueryImpl)query.getContext()).lock();
-
((BrokerImpl)query.getContext().getStoreContext()).lock();
- try {
- return executor.executeQuery(query, params,
range);
- } finally {
- ((QueryImpl)query.getContext()).unlock();
-
((BrokerImpl)query.getContext().getStoreContext()).unlock();
- ((QueryImpl)query.getContext()).stopLocking();
-
((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
- }
+ return executor.executeQuery(query, params, range);
}
}
@@ -298,18 +281,7 @@
Object[] params;
public Number call() throws Exception {
- ((QueryImpl)query.getContext()).startLocking();
-
((BrokerImpl)query.getContext().getStoreContext()).startLocking();
- ((QueryImpl)query.getContext()).lock();
-
((BrokerImpl)query.getContext().getStoreContext()).lock();
- try {
- return executor.executeDelete(query, params);
- } finally {
- ((QueryImpl)query.getContext()).unlock();
-
((BrokerImpl)query.getContext().getStoreContext()).unlock();
- ((QueryImpl)query.getContext()).stopLocking();
-
((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
- }
+ return executor.executeDelete(query, params);
}
}
@@ -319,18 +291,7 @@
Object[] params;
public Number call() throws Exception {
- ((QueryImpl)query.getContext()).startLocking();
-
((BrokerImpl)query.getContext().getStoreContext()).startLocking();
- ((QueryImpl)query.getContext()).lock();
-
((BrokerImpl)query.getContext().getStoreContext()).lock();
- try {
- return executor.executeUpdate(query, params);
- } finally {
- ((QueryImpl)query.getContext()).unlock();
-
((BrokerImpl)query.getContext().getStoreContext()).unlock();
- ((QueryImpl)query.getContext()).stopLocking();
-
((BrokerImpl)query.getContext().getStoreContext()).stopLocking();
- }
+ return executor.executeUpdate(query, params);
}
}
}
Modified:
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java?rev=748292&r1=748291&r2=748292&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
(original)
+++
openjpa/trunk/openjpa-slice/src/test/java/org/apache/openjpa/slice/TestBasic.java
Thu Feb 26 20:20:44 2009
@@ -128,18 +128,17 @@
EntityManager em = emf.createEntityManager();
em.getTransaction().begin();
- PObject pc2 = em.getReference(PObject.class, pc.getId());
- assertNotNull(pc2);
- assertNotEquals(pc, pc2);
- assertEquals(pc.getId(), pc2.getId());
- assertEquals(value, pc2.getValue());
- pc2.setValue(value+1);
- em.merge(pc2);
+ PObject ref = em.getReference(PObject.class, pc.getId());
+ assertNotNull(ref);
+ assertNotEquals(pc, ref);
+ assertEquals(ref.getId(), pc.getId());
+ pc.setValue(value+1);
+ em.merge(pc);
em.getTransaction().commit();
em.clear();
em.getTransaction().begin();
- PObject pc3 = em.getReference(PObject.class, pc.getId());
+ PObject pc3 = em.find(PObject.class, pc.getId());
assertEquals(value+1, pc3.getValue());
em.getTransaction().commit();