Repository: ode
Updated Branches:
  refs/heads/master 0d898f55e -> bfcbd7c9a


ODE-1033: Implemented thread safe version counter


Project: http://git-wip-us.apache.org/repos/asf/ode/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/bfcbd7c9
Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/bfcbd7c9
Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/bfcbd7c9

Branch: refs/heads/master
Commit: bfcbd7c9a01ddfaa75beb0a15cf31387e19ae12c
Parents: 0d898f5
Author: sathwik <[email protected]>
Authored: Sat Jun 13 14:41:21 2015 +0530
Committer: sathwik <[email protected]>
Committed: Sat Jun 13 14:41:21 2015 +0530

----------------------------------------------------------------------
 .../org/apache/ode/store/ProcessStoreImpl.java  | 50 +++++++++++++++++---
 1 file changed, 44 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode/blob/bfcbd7c9/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java 
b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index b689bd1..34ab9f9 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -41,6 +41,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.regex.Matcher;
@@ -101,6 +102,8 @@ public class ProcessStoreImpl implements ProcessStore {
      */
     private DataSource _inMemDs;
 
+    private AtomicLong _version = new AtomicLong(0);
+
     public ProcessStoreImpl() {
         this(null, null, "", new OdeConfigProperties(new Properties(), ""), 
true);
     }
@@ -126,6 +129,46 @@ public class ProcessStoreImpl implements ProcessStore {
             }
             _inMemDs = hsqlds;
         }
+
+        initializeVersionCounter();
+    }
+
+    /**
+     * Process and DU use a monotonically increased single version number by 
default.
+     * Reads the version number from the database and intializes the version 
counter which
+     * is used to assign version numbers to Deployment units and Processes. 
Version counter is an atomic long object.
+     * Cluster implementations need to provide a cluster wide version counter.
+     * Version counter is initialized in the constructor.
+     */
+    protected void initializeVersionCounter(){
+
+        long version = readNextVersionFromDB();
+
+        _version.compareAndSet(0, version);
+    }
+
+    /**
+     * Reads the monotonic version number from the database.
+     * @return incremented value of version.
+     */
+    protected long readNextVersionFromDB() {
+        long version = exec(new Callable<Long>() {
+            public Long call(ConfStoreConnection conn) {
+                return conn.getNextVersion();
+            }
+        });
+
+        return version;
+    }
+
+    /**
+     * Returns the current value of the version counter and increments.
+     * Cluster implementations need to override this method.
+     * @return Current value of the version counter.
+     * @see #initializeVersionCounter()
+     */
+    protected long getAndIncrementVersion(){
+        return _version.getAndIncrement();
     }
 
     /**
@@ -189,12 +232,7 @@ public class ProcessStoreImpl implements ProcessStore {
 
         long version;
         if (autoincrementVersion || du.getStaticVersion() == -1) {
-            // Process and DU use a monotonically increased single version 
number by default.
-            version = exec(new Callable<Long>() {
-                public Long call(ConfStoreConnection conn) {
-                    return conn.getNextVersion();
-                }
-            });
+            version = version = getAndIncrementVersion();
         } else {
             version = du.getStaticVersion();
         }

Reply via email to