Author: seanahn
Date: Sat Oct 17 00:24:56 2009
New Revision: 826142

URL: http://svn.apache.org/viewvc?rev=826142&view=rev
Log:
ode-681, Implement immediate transaction retries in addition to the presistent 
retries

Added:
    
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java
Modified:
    ode/branches/APACHE_ODE_1.X/Rakefile
    
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
    
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java
    
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java

Modified: ode/branches/APACHE_ODE_1.X/Rakefile
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/Rakefile?rev=826142&r1=826141&r2=826142&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/Rakefile (original)
+++ ode/branches/APACHE_ODE_1.X/Rakefile Sat Oct 17 00:24:56 2009
@@ -329,7 +329,7 @@
     compile.with projects("bpel-api", "utils"), COMMONS.collections, 
COMMONS.logging, JAVAX.transaction, LOG4J
     test.compile.with HSQLDB, GERONIMO.kernel, GERONIMO.transaction
     test.with HSQLDB, JAVAX.transaction, JAVAX.resource, JAVAX.connector, 
LOG4J,
-          GERONIMO.kernel, GERONIMO.transaction, BACKPORT, JAVAX.ejb
+          GERONIMO.kernel, GERONIMO.transaction, GERONIMO.connector, TRANQL, 
BACKPORT, JAVAX.ejb
     package :jar
   end
 

Modified: 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=826142&r1=826141&r2=826142&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 Sat Oct 17 00:24:56 2009
@@ -131,6 +131,12 @@
 
     private long _pollIntervalForPolledRunnable = 
Long.getLong("org.apache.ode.polledRunnable.pollInterval", 10 * 60 * 1000);
 
+    /** Number of immediate retries when the transaction fails **/
+    private int _immediateTransactionRetryLimit = 3;
+
+    /** Interval between immediate retries when the transaction fails **/
+    private long _immediateTransactionRetryInterval = 1000;
+
     public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties 
conf) {
         _nodeId = nodeId;
         _db = del;
@@ -140,6 +146,10 @@
         _staleInterval = getLongProperty(conf, "ode.scheduler.staleInterval", 
_staleInterval);
         _tps = getIntProperty(conf, "ode.scheduler.transactionsPerSecond", 
_tps);
         _warningDelay =  getLongProperty(conf, "ode.scheduler.warningDelay", 
_warningDelay);
+
+        _immediateTransactionRetryLimit = getIntProperty(conf, 
"ode.scheduler.immediateTransactionRetryLimit", 
_immediateTransactionRetryLimit);
+        _immediateTransactionRetryInterval = getLongProperty(conf, 
"ode.scheduler.immediateTransactionRetryInterval", 
_immediateTransactionRetryInterval);
+
         _todo = new SchedulerThread(this);
     }
 
@@ -235,30 +245,43 @@
         }
 
         // run in new transaction
-        try {
-            if (__log.isDebugEnabled()) __log.debug("Beginning a new 
transaction");
-            _txm.begin();
-        } catch (Exception ex) {
-            String errmsg = "Internal Error, could not begin transaction.";
-            throw new ContextException(errmsg, ex);
-        }
+        Exception ex = null;
+        int immediateRetryCount = _immediateTransactionRetryLimit;
+        do {
+            try {
+                if (__log.isDebugEnabled()) __log.debug("Beginning a new 
transaction");
+                _txm.begin();
+            } catch (Exception e) {
+                String errmsg = "Internal Error, could not begin transaction.";
+                throw new ContextException(errmsg, e);
+            }
 
-        boolean success = false;
-        try {
-            T retval = transaction.call();
-            success = true;
-            return retval;
-        } catch (Exception ex) {
-            throw ex;
-        } finally {
-            if (success) {
-                if (__log.isDebugEnabled()) __log.debug("Commiting on " + _txm 
+ "...");
-                _txm.commit();
-            } else {
-                if (__log.isDebugEnabled()) __log.debug("Rollbacking on " + 
_txm + "...");
-                _txm.rollback();
+            try {
+                ex = null;
+                return transaction.call();
+            } catch (Exception e) {
+                ex = e;
+            } finally {
+                if (ex == null) {
+                    if (__log.isDebugEnabled()) __log.debug("Commiting on " + 
_txm + "...");
+                    try {
+                        _txm.commit();
+                    } catch( Exception e2 ) {
+                        ex = e2;
+                    }
+                } else {
+                    if (__log.isDebugEnabled()) __log.debug("Rollbacking on " 
+ _txm + "...");
+                    _txm.rollback();
+                }
+                
+                if( ex != null && immediateRetryCount > 0 ) {
+                    if (__log.isDebugEnabled())  __log.debug("Will retry the 
transaction in " + _immediateTransactionRetryInterval + " msecs on " + _txm + " 
for error: ", ex);
+                    Thread.sleep(_immediateTransactionRetryInterval);
+                }
             }
-        }
+        } while( immediateRetryCount-- > 0 );
+        
+        throw ex;
     }
 
     public void setRollbackOnly() throws Exception {

Modified: 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java?rev=826142&r1=826141&r2=826142&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/DelegateSupport.java
 Sat Oct 17 00:24:56 2009
@@ -21,6 +21,9 @@
 
 import java.sql.Connection;
 
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
 import org.apache.ode.utils.GUID;
 import org.hsqldb.jdbc.jdbcDataSource;
 
@@ -32,19 +35,28 @@
  */
 public class DelegateSupport {
 
-    private jdbcDataSource _ds;
-    private JdbcDelegate _del;
+    protected DataSource _ds;
+    protected JdbcDelegate _del;
 
     public DelegateSupport() throws Exception {
-        _ds = new jdbcDataSource();
-        _ds.setDatabase("jdbc:hsqldb:mem:" + new GUID().toString());
-        _ds.setUser("sa");
-        _ds.setPassword("");
+       this(null);
+    }
+
+    public DelegateSupport(TransactionManager txm) throws Exception {
+       initialize(txm);
+    }
+
+    protected void initialize(TransactionManager txm) throws Exception {
+        jdbcDataSource ds = new jdbcDataSource();
+        ds.setDatabase("jdbc:hsqldb:mem:" + new GUID().toString());
+        ds.setUser("sa");
+        ds.setPassword("");
+        _ds = ds;
         
         setup();
         _del = new JdbcDelegate(_ds);
     }
-
+    
     public DatabaseDelegate delegate() {
         return _del;
     }

Added: 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java?rev=826142&view=auto
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java
 (added)
+++ 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/GeronimoDelegateSupport.java
 Sat Oct 17 00:24:56 2009
@@ -0,0 +1,71 @@
+package org.apache.ode.scheduler.simple;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.apache.geronimo.connector.outbound.GenericConnectionManager;
+import 
org.apache.geronimo.connector.outbound.connectionmanagerconfig.LocalTransactions;
+import 
org.apache.geronimo.connector.outbound.connectionmanagerconfig.PoolingSupport;
+import 
org.apache.geronimo.connector.outbound.connectionmanagerconfig.SinglePool;
+import 
org.apache.geronimo.connector.outbound.connectionmanagerconfig.TransactionSupport;
+import 
org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTracker;
+import 
org.apache.geronimo.connector.outbound.connectiontracking.ConnectionTrackingCoordinator;
+import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
+import org.apache.ode.utils.GUID;
+import org.tranql.connector.jdbc.JDBCDriverMCF;
+
+public class GeronimoDelegateSupport extends DelegateSupport {
+    private static final int CONNECTION_MAX_WAIT_MILLIS = 30000;
+
+    private static final int CONNECTION_MAX_IDLE_MINUTES = 5;
+
+    private GenericConnectionManager _connectionManager;
+
+    public GeronimoDelegateSupport(TransactionManager txm) throws Exception {
+       super(txm);
+       }
+
+    @Override
+    protected void initialize(TransactionManager txm) throws Exception {
+        _ds = createGeronimoDataSource(txm, "jdbc:hsqldb:mem:" + new 
GUID().toString(), "org.hsqldb.jdbcDriver", "sa", "");
+        setup();
+        _del = new JdbcDelegate(_ds);
+    }
+
+    private DataSource createGeronimoDataSource(TransactionManager txm, String 
url, String driverClass, String username,String password) {
+        TransactionSupport transactionSupport = LocalTransactions.INSTANCE;
+        ConnectionTracker connectionTracker = new 
ConnectionTrackingCoordinator();
+
+        PoolingSupport poolingSupport = new SinglePool(1, 1, 
+                CONNECTION_MAX_WAIT_MILLIS,
+                CONNECTION_MAX_IDLE_MINUTES,
+                true, // match one
+                false, // match all
+                false); // select one assume match
+
+        _connectionManager = new GenericConnectionManager(
+                    transactionSupport,
+                    poolingSupport,
+                    null,
+                    connectionTracker,
+                    (RecoverableTransactionManager) txm,
+                    getClass().getName(),
+                    getClass().getClassLoader());
+
+        JDBCDriverMCF mcf = new JDBCDriverMCF();
+        try {
+            mcf.setDriver(driverClass);
+            mcf.setConnectionURL(url);
+            if (username != null) {
+                mcf.setUserName(username);
+            }
+            if (password != null) {
+                mcf.setPassword(password);
+            }
+            _connectionManager.doStart();
+            return (DataSource) 
mcf.createConnectionFactory(_connectionManager);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+}

Modified: 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=826142&r1=826141&r2=826142&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
 Sat Oct 17 00:24:56 2009
@@ -24,6 +24,7 @@
 
 import javax.transaction.TransactionManager;
 import java.util.*;
+import java.util.concurrent.Callable;
 
 import junit.framework.TestCase;
 
@@ -37,10 +38,11 @@
     ArrayList<Scheduler.JobInfo> _commit;
     TransactionManager _txm;
     int _tried = 0;
-
+    Scheduler.JobInfo _jobInfo = null;
+    
     public void setUp() throws Exception {
         _txm = new GeronimoTransactionManager();
-        _ds = new DelegateSupport();
+        _ds = new GeronimoDelegateSupport(_txm);
 
         _scheduler = newScheduler("n1");
         _jobs = new ArrayList<Scheduler.JobInfo>(100);
@@ -64,13 +66,33 @@
         }
 
         Thread.sleep(10000);
-        assertEquals(3, _tried);
+        assertEquals(8, _tried);
     }
 
+    public void testExecTransaction() throws Exception {
+        final int[] tryCount = new int[1];
+        tryCount[0] = 0;
+        
+        Callable<Void> transaction = new Callable<Void>() {
+            public Void call() throws Exception {
+                tryCount[0]++;
+                if( tryCount[0] < 3 ) {
+                    throw new Exception("any");
+                } else {
+                    return null;
+                }
+            }            
+        };
+
+        _scheduler.execTransaction(transaction);
+        assertEquals(3, tryCount[0]);
+    }
 
     public void onScheduledJob(Scheduler.JobInfo jobInfo) throws 
Scheduler.JobProcessorException {
+        _jobInfo = jobInfo;
+        
         _tried++;
-        throw new Scheduler.JobProcessorException(jobInfo.retryCount < 3);
+        throw new Scheduler.JobProcessorException(jobInfo.retryCount < 1);
     }
 
     Map<String, Object> newDetail(String x) {
@@ -85,5 +107,4 @@
         scheduler.setTransactionManager(_txm);
         return scheduler;
     }
-
 }


Reply via email to