Author: ppoddar
Date: Mon Dec 22 14:11:48 2008
New Revision: 728807
URL: http://svn.apache.org/viewvc?rev=728807&view=rev
Log:
Handle version adjustment when replicated instances are flushed to multiple
slices. Refactor untyped OpenJPAStateManager collections to save some redundant
iteration.
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java?rev=728807&r1=728806&r2=728807&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java
(original)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/SlicePersistence.java
Mon Dec 22 14:11:48 2008
@@ -31,6 +31,13 @@
*
*/
public class SlicePersistence {
+ /**
+ * The key for setting the Query hints. The value is comma-separated Slice
+ * names. If the hint is specified then the query is executed only on the
+ * listed slices.
+ */
+ public static final String HINT_TARGET = ProductDerivation.HINT_TARGET;
+
/**
* Get the slice identifier for the given instance if it is a managed
* instance and has been assigned to a slice.
@@ -69,5 +76,4 @@
return false;
return info.isReplicated();
}
-
}
Modified:
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL:
http://svn.apache.org/viewvc/openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=728807&r1=728806&r2=728807&view=diff
==============================================================================
---
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
(original)
+++
openjpa/trunk/openjpa-slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
Mon Dec 22 14:11:48 2008
@@ -25,6 +25,7 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -245,19 +246,21 @@
public Collection flush(Collection sms) {
Collection exceptions = new ArrayList();
List<Future<Collection>> futures = new ArrayList<Future<Collection>>();
- Map<String, List<OpenJPAStateManager>> subsets = bin(sms, null);
-
+ Map<String, StateManagerSet> subsets = bin(sms, null);
+ Collection<StateManagerSet> remaining =
+ new ArrayList<StateManagerSet>(subsets.values());
boolean parallel = !getConfiguration().getMultithreaded();
for (int i = 0; i < _slices.size(); i++) {
SliceStoreManager slice = _slices.get(i);
- List<OpenJPAStateManager> subset = subsets.get(slice.getName());
+ StateManagerSet subset = subsets.get(slice.getName());
if (subset.isEmpty())
continue;
- if (containsReplicated(subset)) {
- Map<OpenJPAStateManager, Object> versions =
cacheVersion(subset);
+ if (subset.containsReplicated()) {
+ Map<OpenJPAStateManager, Object> oldVersions = cacheVersion(
+ subset.getReplicated());
collectException(slice.flush(subset), exceptions);
- if (i != _slices.size()-1)
- rollbackVersion(subset, versions);
+ remaining.remove(subset);
+ rollbackVersion(subset.getReplicated(), oldVersions, remaining);
} else {
if (parallel) {
futures.add(threadPool.submit(new Flusher(slice,
subset)));
@@ -286,40 +289,51 @@
}
}
- boolean containsReplicated(List<OpenJPAStateManager> sms) {
- for (OpenJPAStateManager sm : sms)
- if (sm.getMetaData().isReplicated())
- return true;
- return false;
- }
-
- private Map<OpenJPAStateManager, Object> cacheVersion
- (List<OpenJPAStateManager> sms) {
+ /**
+ * Collect the current versions of the given StateManagers.
+ */
+ private Map<OpenJPAStateManager, Object> cacheVersion(
+ List<OpenJPAStateManager> sms) {
Map<OpenJPAStateManager, Object> result =
- new HashMap<OpenJPAStateManager, Object>();
+ new HashMap<OpenJPAStateManager, Object>();
for (OpenJPAStateManager sm : sms)
- if (sm.getMetaData().isReplicated())
- result.put(sm, sm.getVersion());
+ result.put(sm, sm.getVersion());
return result;
}
+ /**
+ * Sets the version of the given StateManagers from the cached versions.
+ * Provided that the StateManager does not appear in the FlusSets of the
+ * remaining.
+ */
private void rollbackVersion(List<OpenJPAStateManager> sms,
- Map<OpenJPAStateManager, Object> result) {
- for (OpenJPAStateManager sm : sms)
- if (sm.getMetaData().isReplicated())
- sm.setVersion(result.get(sm));
+ Map<OpenJPAStateManager, Object> oldVersions,
+ Collection<StateManagerSet> reminder) {
+ if (reminder.isEmpty())
+ return;
+ for (OpenJPAStateManager sm : sms) {
+ if (occurs(sm, reminder))
+ sm.setVersion(oldVersions.get(sm));
+ }
+ }
+
+ boolean occurs(OpenJPAStateManager sm,
+ Collection<StateManagerSet> reminder) {
+ for (StateManagerSet set : reminder)
+ if (set.contains(sm))
+ return true;
+ return false;
}
/**
* Separate the given list of StateManagers in separate lists for each
slice
* by the associated slice identifier of each StateManager.
*/
- private Map<String, List<OpenJPAStateManager>> bin(
- Collection/*<StateManage>*/ sms, Object edata) {
- Map<String, List<OpenJPAStateManager>> subsets =
- new HashMap<String, List<OpenJPAStateManager>>();
+ private Map<String, StateManagerSet> bin(Collection sms, Object edata) {
+ Map<String, StateManagerSet> subsets =
+ new HashMap<String, StateManagerSet>();
for (SliceStoreManager slice : _slices)
- subsets.put(slice.getName(), new ArrayList<OpenJPAStateManager>());
+ subsets.put(slice.getName(), new StateManagerSet());
for (Object x : sms) {
OpenJPAStateManager sm = (OpenJPAStateManager) x;
String[] targets = findSliceNames(sm, edata).getSlices();
@@ -379,10 +393,10 @@
public Collection loadAll(Collection sms, PCState state, int load,
FetchConfiguration fetch, Object edata) {
- Map<String, List<OpenJPAStateManager>> subsets = bin(sms, edata);
+ Map<String, StateManagerSet> subsets = bin(sms, edata);
Collection result = new ArrayList();
for (SliceStoreManager slice : _slices) {
- List<OpenJPAStateManager> subset = subsets.get(slice.getName());
+ StateManagerSet subset = subsets.get(slice.getName());
if (subset.isEmpty())
continue;
Collection tmp = slice.loadAll(subset, state, load, fetch, edata);
@@ -474,15 +488,11 @@
return targets;
}
- void log(String s) {
- System.out.println("["+Thread.currentThread().getName()+"] " + this +
s);
- }
-
private static class Flusher implements Callable<Collection> {
final SliceStoreManager store;
- final Collection toFlush;
+ final StateManagerSet toFlush;
- Flusher(SliceStoreManager store, Collection toFlush) {
+ Flusher(SliceStoreManager store, StateManagerSet toFlush) {
this.store = store;
this.toFlush = toFlush;
}
@@ -496,5 +506,37 @@
}
}
}
-
+
+ /**
+ * A specialized, insert-only collection of StateManagers that notes
+ * if any of its member is replicated.
+ *
+ */
+ private static class StateManagerSet extends HashSet<OpenJPAStateManager> {
+ List<OpenJPAStateManager> replicated;
+
+ @Override
+ public boolean add(OpenJPAStateManager sm) {
+ boolean isReplicated = sm.getMetaData().isReplicated();
+ if (isReplicated) {
+ if (replicated == null)
+ replicated = new ArrayList<OpenJPAStateManager>();
+ replicated.add(sm);
+ }
+ return super.add(sm);
+ }
+
+ @Override
+ public boolean remove(Object sm) {
+ throw new UnsupportedOperationException();
+ }
+
+ boolean containsReplicated() {
+ return replicated != null && !replicated.isEmpty();
+ }
+
+ List<OpenJPAStateManager> getReplicated() {
+ return replicated;
+ }
+ }
}