Repository: ode Updated Branches: refs/heads/master fb731a748 -> b6b655d7a
ODE-1033: incremented version number by holding DB exclusive lock Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/97239d0f Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/97239d0f Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/97239d0f Branch: refs/heads/master Commit: 97239d0f557978fecd850cdc01e167460f101bec Parents: fb731a7 Author: sathwik <[email protected]> Authored: Tue Aug 18 15:47:31 2015 +0530 Committer: sathwik <[email protected]> Committed: Tue Aug 18 15:47:31 2015 +0530 ---------------------------------------------------------------------- .../org/apache/ode/store/ProcessStoreImpl.java | 22 +++++++---- .../ode/store/hib/ConfStoreConnectionHib.java | 27 +++++++++---- .../ode/store/jpa/ConfStoreConnectionJpa.java | 41 +++++++++++++++----- .../store/jpa/DbConfStoreConnectionFactory.java | 1 + 4 files changed, 66 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/97239d0f/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java index cf7d194..fabd531 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java @@ -101,7 +101,7 @@ public class ProcessStoreImpl implements ProcessStore { */ private DataSource _inMemDs; - + private static final ThreadLocal<Long> _currentVersion = new ThreadLocal<Long>(); public ProcessStoreImpl() { this(null, null, "", new OdeConfigProperties(new Properties(), ""), true); @@ -195,11 +195,12 @@ public class ProcessStoreImpl implements ProcessStore { long version; if (autoincrementVersion || du.getStaticVersion() == -1) { // Process and DU use a monotonically increased single version number by default. - version = exec(new Callable<Long>() { - public Long call(ConfStoreConnection conn) { - return conn.getNextVersion(); - } - }); + try { + version = getCurrentVersion(); + } finally { + //we need to reset the current version thread local value. + _currentVersion.set(null); + } } else { version = du.getStaticVersion(); } @@ -296,7 +297,6 @@ public class ProcessStoreImpl implements ProcessStore { newDao.setProperty(prop.getKey(), DOMUtils.domToString(prop.getValue())); } deployed.add(pc.getProcessId()); - conn.setVersion(pc.getVersion()); } catch (Throwable e) { String errmsg = "Error persisting deployment record for " + pc.getProcessId() + "; process will not be available after restart!"; @@ -583,12 +583,18 @@ public class ProcessStoreImpl implements ProcessStore { } public long getCurrentVersion() { + if (_currentVersion.get() != null){ + return _currentVersion.get(); + } + long version = exec(new Callable<Long>() { public Long call(ConfStoreConnection conn) { return conn.getNextVersion(); } }); - return version; + + _currentVersion.set(version); + return _currentVersion.get(); } protected void fireEvent(ProcessStoreEvent pse) { http://git-wip-us.apache.org/repos/asf/ode/blob/97239d0f/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java b/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java index c8469ae..a7c7099 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java +++ b/bpel-store/src/main/java/org/apache/ode/store/hib/ConfStoreConnectionHib.java @@ -25,6 +25,8 @@ import org.apache.ode.store.DeploymentUnitDAO; import org.apache.ode.store.ProcessConfDAO; import org.hibernate.Criteria; import org.hibernate.HibernateException; +import org.hibernate.LockMode; +import org.hibernate.Query; import org.hibernate.Session; import javax.xml.namespace.QName; @@ -77,21 +79,32 @@ public class ConfStoreConnectionHib implements ConfStoreConnection { } public long getNextVersion() { - VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl) - _session.createQuery("from VersionTrackerDAOImpl v ").uniqueResult(); - if (vt == null) return 1; - else return vt.getVersion() + 1; + Query q = _session.createQuery("from VersionTrackerDAOImpl v "); + q.setLockMode("v", LockMode.UPGRADE); + VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl) q.uniqueResult(); + if (vt == null) { + vt = new VersionTrackerDAOImpl(); + vt.setVersion(1); + }else { + vt.setVersion(vt.getVersion() + 1); + } + _session.save(vt); + return vt.getVersion(); } public void setVersion(long version) { _session.flush(); - VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl) - _session.createQuery("from VersionTrackerDAOImpl v ").uniqueResult(); + + Query q = _session.createQuery("from VersionTrackerDAOImpl v "); + q.setLockMode("v", LockMode.UPGRADE); + VersionTrackerDAOImpl vt = (VersionTrackerDAOImpl) q.uniqueResult(); if (vt == null) { vt = new VersionTrackerDAOImpl(); vt.setId(1); + vt.setVersion(1); + } else { + vt.setVersion(version); } - vt.setVersion(version); _session.save(vt); } http://git-wip-us.apache.org/repos/asf/ode/blob/97239d0f/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java b/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java index fa2f4a7..72a9df9 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java +++ b/bpel-store/src/main/java/org/apache/ode/store/jpa/ConfStoreConnectionJpa.java @@ -25,6 +25,7 @@ import org.apache.ode.store.ConfStoreConnection; import org.apache.ode.store.DeploymentUnitDAO; import javax.persistence.EntityManager; +import javax.persistence.Query; import java.util.Collection; import java.util.Date; import java.util.List; @@ -64,19 +65,39 @@ public class ConfStoreConnectionJpa implements ConfStoreConnection { } public long getNextVersion() { - List<VersionTrackerDAOImpl> res = _em.createQuery("select v from VersionTrackerDAOImpl v").getResultList(); - if (res.size() == 0) return 1; - else { - VersionTrackerDAOImpl vt = res.get(0); - return vt.getVersion() + 1; - } + VersionTrackerDAOImpl vt = null; + Query query = _em.createQuery("select v from VersionTrackerDAOImpl v"); + query.setHint("openjpa.FetchPlan.ReadLockMode", "WRITE"); + + List<VersionTrackerDAOImpl> res = query.getResultList(); + + if(!res.isEmpty()) + vt = res.get(0); + + if (vt == null) { + vt = new VersionTrackerDAOImpl(); + vt.setVersion(1); + } else { + vt.setVersion(vt.getVersion() + 1); + } + + _em.persist(vt); + return vt.getVersion(); } public void setVersion(long version) { - List<VersionTrackerDAOImpl> res = _em.createQuery("select v from VersionTrackerDAOImpl v").getResultList(); - VersionTrackerDAOImpl vt; - if (res.size() == 0) vt = new VersionTrackerDAOImpl(); - else vt = res.get(0); + VersionTrackerDAOImpl vt = null; + Query query = _em.createQuery("select v from VersionTrackerDAOImpl v"); + query.setHint("openjpa.FetchPlan.ReadLockMode", "WRITE"); + + List<VersionTrackerDAOImpl> res = query.getResultList(); + + if(!res.isEmpty()) + vt = res.get(0); + + if (vt == null) + vt = new VersionTrackerDAOImpl(); + vt.setVersion(version); _em.persist(vt); } http://git-wip-us.apache.org/repos/asf/ode/blob/97239d0f/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java b/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java index bd8508f..407e18a 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java +++ b/bpel-store/src/main/java/org/apache/ode/store/jpa/DbConfStoreConnectionFactory.java @@ -60,6 +60,7 @@ public class DbConfStoreConnectionFactory implements ConfStoreConnectionFactory propMap.put("openjpa.ConnectionFactoryMode", "managed"); propMap.put("openjpa.FlushBeforeQueries", "false"); propMap.put("openjpa.FetchBatchSize", 1000); + propMap.put("openjpa.LockManager","pessimistic"); //dirty hack for ODE-1015 String skipIsolation = System.getProperty("openjpa.connection.isolation.skip", "N");
