Repository: ode Updated Branches: refs/heads/master 0d898f55e -> bfcbd7c9a
ODE-1033: Implemented thread safe version counter Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/bfcbd7c9 Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/bfcbd7c9 Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/bfcbd7c9 Branch: refs/heads/master Commit: bfcbd7c9a01ddfaa75beb0a15cf31387e19ae12c Parents: 0d898f5 Author: sathwik <[email protected]> Authored: Sat Jun 13 14:41:21 2015 +0530 Committer: sathwik <[email protected]> Committed: Sat Jun 13 14:41:21 2015 +0530 ---------------------------------------------------------------------- .../org/apache/ode/store/ProcessStoreImpl.java | 50 +++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/bfcbd7c9/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java ---------------------------------------------------------------------- diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java index b689bd1..34ab9f9 100644 --- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java +++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; @@ -101,6 +102,8 @@ public class ProcessStoreImpl implements ProcessStore { */ private DataSource _inMemDs; + private AtomicLong _version = new AtomicLong(0); + public ProcessStoreImpl() { this(null, null, "", new OdeConfigProperties(new Properties(), ""), true); } @@ -126,6 +129,46 @@ public class ProcessStoreImpl implements ProcessStore { } _inMemDs = hsqlds; } + + initializeVersionCounter(); + } + + /** + * Process and DU use a monotonically increased single version number by default. + * Reads the version number from the database and intializes the version counter which + * is used to assign version numbers to Deployment units and Processes. Version counter is an atomic long object. + * Cluster implementations need to provide a cluster wide version counter. + * Version counter is initialized in the constructor. + */ + protected void initializeVersionCounter(){ + + long version = readNextVersionFromDB(); + + _version.compareAndSet(0, version); + } + + /** + * Reads the monotonic version number from the database. + * @return incremented value of version. + */ + protected long readNextVersionFromDB() { + long version = exec(new Callable<Long>() { + public Long call(ConfStoreConnection conn) { + return conn.getNextVersion(); + } + }); + + return version; + } + + /** + * Returns the current value of the version counter and increments. + * Cluster implementations need to override this method. + * @return Current value of the version counter. + * @see #initializeVersionCounter() + */ + protected long getAndIncrementVersion(){ + return _version.getAndIncrement(); } /** @@ -189,12 +232,7 @@ public class ProcessStoreImpl implements ProcessStore { long version; if (autoincrementVersion || du.getStaticVersion() == -1) { - // Process and DU use a monotonically increased single version number by default. - version = exec(new Callable<Long>() { - public Long call(ConfStoreConnection conn) { - return conn.getNextVersion(); - } - }); + version = version = getAndIncrementVersion(); } else { version = du.getStaticVersion(); }
